You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [38/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...

Added: incubator/tez/tez-yarn-client/.classpath
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/.classpath?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/.classpath (added)
+++ incubator/tez/tez-yarn-client/.classpath Fri Mar 15 21:26:36 2013
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+	<classpathentry including="**/*.java" kind="src" path="src/main/java"/>
+	<classpathentry excluding="**/*.java" kind="src" path="src/main/resources"/>
+	<classpathentry kind="var" path="M2_REPO/javax/activation/activation/1.1/activation-1.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/javax/inject/javax.inject/1/javax.inject-1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar"/>
+	<classpathentry kind="var" path="M2_REPO/javax/servlet/jsp/jsp-api/2.1/jsp-api-2.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar"/>
+	<classpathentry kind="var" path="M2_REPO/aopalliance/aopalliance/1.0/aopalliance-1.0.jar"/>
+	<classpathentry kind="var" path="M2_REPO/asm/asm/3.1/asm-3.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/apache/avro/avro/1.5.3/avro-1.5.3.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-cli/commons-cli/1.2/commons-cli-1.2.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-codec/commons-codec/1.4/commons-codec-1.4.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/apache/commons/commons-compress/1.4/commons-compress-1.4.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-digester/commons-digester/1.8/commons-digester-1.8.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-el/commons-el/1.0/commons-el-1.0.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-io/commons-io/2.1/commons-io-2.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-lang/commons-lang/2.5/commons-lang-2.5.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/apache/commons/commons-math/2.1/commons-math-2.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/commons-net/commons-net/3.1/commons-net-3.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/google/guava/guava/11.0.2/guava-11.0.2.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/google/inject/guice/3.0/guice-3.0.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/google/inject/extensions/guice-assistedinject/3.0/guice-assistedinject-3.0.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-annotations/3.0.0-SNAPSHOT/hadoop-annotations-3.0.0-SNAPSHOT.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-auth/3.0.0-SNAPSHOT/hadoop-auth-3.0.0-SNAPSHOT.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT-sources.jar">
+		<attributes>
+			<attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-common/3.0.0-SNAPSHOT/hadoop-mapreduce-client-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-common/3.0.0-SNAPSHOT/hadoop-mapreduce-client-common-3.0.0-SNAPSHOT-sources.jar">
+		<attributes>
+			<attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/3.0.0-SNAPSHOT/hadoop-mapreduce-client-common-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-core/3.0.0-SNAPSHOT/hadoop-mapreduce-client-core-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-core/3.0.0-SNAPSHOT/hadoop-mapreduce-client-core-3.0.0-SNAPSHOT-sources.jar">
+		<attributes>
+			<attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/3.0.0-SNAPSHOT/hadoop-mapreduce-client-core-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-shuffle/3.0.0-SNAPSHOT/hadoop-mapreduce-client-shuffle-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-shuffle/3.0.0-SNAPSHOT/hadoop-mapreduce-client-shuffle-3.0.0-SNAPSHOT-sources.jar">
+		<attributes>
+			<attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/3.0.0-SNAPSHOT/hadoop-mapreduce-client-shuffle-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT-sources.jar">
+		<attributes>
+			<attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT-sources.jar">
+		<attributes>
+			<attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT-sources.jar">
+		<attributes>
+			<attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+		</attributes>
+	</classpathentry>
+	<classpathentry kind="var" path="M2_REPO/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-jaxrs/1.7.1/jackson-jaxrs-1.7.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-xc/1.7.1/jackson-xc-1.7.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/tomcat/jasper-compiler/5.5.23/jasper-compiler-5.5.23.jar"/>
+	<classpathentry kind="var" path="M2_REPO/tomcat/jasper-runtime/5.5.23/jasper-runtime-5.5.23.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/sun/xml/bind/jaxb-impl/2.2.3-1/jaxb-impl-2.2.3-1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-core/1.8/jersey-core-1.8.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-json/1.8/jersey-json-1.8.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-server/1.8/jersey-server-1.8.jar"/>
+	<classpathentry kind="var" path="M2_REPO/net/java/dev/jets3t/jets3t/0.6.1/jets3t-0.6.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/codehaus/jettison/jettison/1.1/jettison-1.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar"/>
+	<classpathentry kind="var" path="M2_REPO/jline/jline/0.9.94/jline-0.9.94.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar"/>
+	<classpathentry kind="var" path="M2_REPO/junit/junit/4.11/junit-4.11.jar"/>
+	<classpathentry kind="var" path="M2_REPO/log4j/log4j/1.2.17/log4j-1.2.17.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/jboss/netty/netty/3.2.2.Final/netty-3.2.2.Final.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar"/>
+	<classpathentry kind="var" path="M2_REPO/com/google/protobuf/protobuf-java/2.4.0a/protobuf-java-2.4.0a.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/xerial/snappy/snappy-java/1.0.3.2/snappy-java-1.0.3.2.jar"/>
+	<classpathentry kind="var" path="M2_REPO/stax/stax-api/1.0.1/stax-api-1.0.1.jar"/>
+	<classpathentry kind="src" path="/tez-api"/>
+	<classpathentry kind="src" path="/tez-common"/>
+	<classpathentry kind="src" path="/tez-engine"/>
+	<classpathentry kind="src" path="/tez-mapreduce"/>
+	<classpathentry kind="var" path="M2_REPO/xmlenc/xmlenc/0.52/xmlenc-0.52.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/tukaani/xz/1.0/xz-1.0.jar"/>
+	<classpathentry kind="var" path="M2_REPO/org/apache/zookeeper/zookeeper/3.4.2/zookeeper-3.4.2.jar"/>
+	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+	<classpathentry kind="src" path="/hadoop-yarn-client"/>
+	<classpathentry kind="output" path="target/classes"/>
+</classpath>

Added: incubator/tez/tez-yarn-client/.project
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/.project?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/.project (added)
+++ incubator/tez/tez-yarn-client/.project Fri Mar 15 21:26:36 2013
@@ -0,0 +1,18 @@
+<projectDescription>
+  <name>tez-yarn-client</name>
+  <comment>NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.</comment>
+  <projects>
+    <project>tez-api</project>
+    <project>tez-common</project>
+    <project>tez-engine</project>
+    <project>tez-mapreduce</project>
+  </projects>
+  <buildSpec>
+    <buildCommand>
+      <name>org.eclipse.jdt.core.javabuilder</name>
+    </buildCommand>
+  </buildSpec>
+  <natures>
+    <nature>org.eclipse.jdt.core.javanature</nature>
+  </natures>
+</projectDescription>
\ No newline at end of file

Added: incubator/tez/tez-yarn-client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/pom.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/pom.xml (added)
+++ incubator/tez/tez-yarn-client/pom.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.1.0</version>
+  </parent>
+  <artifactId>tez-yarn-client</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>

Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+public class ClientCache {
+
+  private final Configuration conf;
+  private final ResourceMgrDelegate rm;
+
+  private static final Log LOG = LogFactory.getLog(ClientCache.class);
+
+  private Map<JobID, ClientServiceDelegate> cache = 
+      new HashMap<JobID, ClientServiceDelegate>();
+
+  private MRClientProtocol hsProxy;
+
+  public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+    this.conf = conf;
+    this.rm = rm;
+  }
+
+  //TODO: evict from the cache on some threshold
+  public synchronized ClientServiceDelegate getClient(JobID jobId) {
+    if (hsProxy == null) {
+      try {
+        hsProxy = instantiateHistoryProxy();
+      } catch (IOException e) {
+        LOG.warn("Could not connect to History server.", e);
+        throw new YarnException("Could not connect to History server.", e);
+      }
+    }
+    ClientServiceDelegate client = cache.get(jobId);
+    if (client == null) {
+      client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
+      cache.put(jobId, client);
+    }
+    return client;
+  }
+
+  protected synchronized MRClientProtocol getInitializedHSProxy()
+      throws IOException {
+    if (this.hsProxy == null) {
+      hsProxy = instantiateHistoryProxy();
+    }
+    return this.hsProxy;
+  }
+  
+  protected MRClientProtocol instantiateHistoryProxy()
+      throws IOException {
+    final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
+    if (StringUtils.isEmpty(serviceAddr)) {
+      return null;
+    }
+    LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
+    final YarnRPC rpc = YarnRPC.create(conf);
+    LOG.debug("Connected to HistoryServer at: " + serviceAddr);
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+      @Override
+      public MRClientProtocol run() {
+        return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
+            NetUtils.createSocketAddr(serviceAddr), conf);
+      }
+    });
+  }
+}

Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.*;
+import org.apache.hadoop.mapreduce.v2.api.records.*;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ClientToken;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+
+public class ClientServiceDelegate {
+  private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
+  private static final String UNAVAILABLE = "N/A";
+
+  // Caches for per-user NotRunningJobs
+  private HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs;
+
+  private final Configuration conf;
+  private final JobID jobId;
+  private final ApplicationId appId;
+  private final ResourceMgrDelegate rm;
+  private final MRClientProtocol historyServerProxy;
+  private MRClientProtocol realProxy = null;
+  private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private static String UNKNOWN_USER = "Unknown User";
+  private String trackingUrl;
+
+  private boolean amAclDisabledStatusLogged = false;
+
+  public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+      JobID jobId, MRClientProtocol historyServerProxy) {
+    this.conf = new Configuration(conf); // Cloning for modifying.
+    // For faster redirects from AM to HS.
+    this.conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+        this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
+            MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
+    this.rm = rm;
+    this.jobId = jobId;
+    this.historyServerProxy = historyServerProxy;
+    this.appId = TypeConverter.toYarn(jobId).getAppId();
+    notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>();
+  }
+
+  // Get the instance of the NotRunningJob corresponding to the specified
+  // user and state
+  private NotRunningJob getNotRunningJob(ApplicationReport applicationReport,
+      JobState state) {
+    synchronized (notRunningJobs) {
+      HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
+      if (map == null) {
+        map = new HashMap<String, NotRunningJob>();
+        notRunningJobs.put(state, map);
+      }
+      String user =
+          (applicationReport == null) ?
+              UNKNOWN_USER : applicationReport.getUser();
+      NotRunningJob notRunningJob = map.get(user);
+      if (notRunningJob == null) {
+        notRunningJob = new NotRunningJob(applicationReport, state);
+        map.put(user, notRunningJob);
+      }
+      return notRunningJob;
+    }
+  }
+
+  private MRClientProtocol getProxy() throws YarnRemoteException {
+    if (realProxy != null) {
+      return realProxy;
+    }
+    
+    // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
+    // and redirect to the history server.
+    ApplicationReport application = rm.getApplicationReport(appId);
+    if (application != null) {
+      trackingUrl = application.getTrackingUrl();
+    }
+    InetSocketAddress serviceAddr = null;
+    while (application == null
+        || YarnApplicationState.RUNNING == application
+            .getYarnApplicationState()) {
+      if (application == null) {
+        LOG.info("Could not get Job info from RM for job " + jobId
+            + ". Redirecting to job history server.");
+        return checkAndGetHSProxy(null, JobState.NEW);
+      }
+      try {
+        if (application.getHost() == null || "".equals(application.getHost())) {
+          LOG.debug("AM not assigned to Job. Waiting to get the AM ...");
+          Thread.sleep(2000);
+
+          LOG.debug("Application state is " + application.getYarnApplicationState());
+          application = rm.getApplicationReport(appId);
+          continue;
+        } else if (UNAVAILABLE.equals(application.getHost())) {
+          if (!amAclDisabledStatusLogged) {
+            LOG.info("Job " + jobId + " is running, but the host is unknown."
+                + " Verify user has VIEW_JOB access.");
+            amAclDisabledStatusLogged = true;
+          }
+          return getNotRunningJob(application, JobState.RUNNING);
+        }
+        if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
+          UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
+              UserGroupInformation.getCurrentUser().getUserName());
+          serviceAddr = NetUtils.createSocketAddrForHost(
+              application.getHost(), application.getRpcPort());
+          if (UserGroupInformation.isSecurityEnabled()) {
+            ClientToken clientToken = application.getClientToken();
+            Token<ClientTokenIdentifier> token =
+                ProtoUtils.convertFromProtoFormat(clientToken, serviceAddr);
+            newUgi.addToken(token);
+          }
+          LOG.debug("Connecting to " + serviceAddr);
+          final InetSocketAddress finalServiceAddr = serviceAddr;
+          realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
+            @Override
+            public MRClientProtocol run() throws IOException {
+              return instantiateAMProxy(finalServiceAddr);
+            }
+          });
+        } else {
+          if (!amAclDisabledStatusLogged) {
+            LOG.info("Network ACL closed to AM for job " + jobId
+                + ". Not going to try to reach the AM.");
+            amAclDisabledStatusLogged = true;
+          }
+          return getNotRunningJob(null, JobState.RUNNING);
+        }
+        return realProxy;
+      } catch (IOException e) {
+        //possibly the AM has crashed
+        //there may be some time before AM is restarted
+        //keep retrying by getting the address from RM
+        LOG.info("Could not connect to " + serviceAddr +
+        ". Waiting for getting the latest AM address...");
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e1) {
+          LOG.warn("getProxy() call interruped", e1);
+          throw new YarnException(e1);
+        }
+        application = rm.getApplicationReport(appId);
+        if (application == null) {
+          LOG.info("Could not get Job info from RM for job " + jobId
+              + ". Redirecting to job history server.");
+          return checkAndGetHSProxy(null, JobState.RUNNING);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("getProxy() call interruped", e);
+        throw new YarnException(e);
+      }
+    }
+
+    /** we just want to return if its allocating, so that we don't
+     * block on it. This is to be able to return job status
+     * on an allocating Application.
+     */
+    String user = application.getUser();
+    if (user == null) {
+      throw RPCUtil.getRemoteException("User is not set in the application report");
+    }
+    if (application.getYarnApplicationState() == YarnApplicationState.NEW
+        || application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
+        || application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+      realProxy = null;
+      return getNotRunningJob(application, JobState.NEW);
+    }
+
+    if (application.getYarnApplicationState() == YarnApplicationState.FAILED) {
+      realProxy = null;
+      return getNotRunningJob(application, JobState.FAILED);
+    }
+
+    if (application.getYarnApplicationState() == YarnApplicationState.KILLED) {
+      realProxy = null;
+      return getNotRunningJob(application, JobState.KILLED);
+    }
+
+    //History server can serve a job only if application
+    //succeeded.
+    if (application.getYarnApplicationState() == YarnApplicationState.FINISHED) {
+      LOG.info("Application state is completed. FinalApplicationStatus="
+          + application.getFinalApplicationStatus().toString()
+          + ". Redirecting to job history server");
+      realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED);
+    }
+    return realProxy;
+  }
+
+  private MRClientProtocol checkAndGetHSProxy(
+      ApplicationReport applicationReport, JobState state) {
+    if (null == historyServerProxy) {
+      LOG.warn("Job History Server is not configured.");
+      return getNotRunningJob(applicationReport, state);
+    }
+    return historyServerProxy;
+  }
+
+  MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr)
+      throws IOException {
+    LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
+    YarnRPC rpc = YarnRPC.create(conf);
+    MRClientProtocol proxy = 
+         (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+            serviceAddr, conf);
+    LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
+    return proxy;
+  }
+
+  private synchronized Object invoke(String method, Class argClass,
+      Object args) throws IOException {
+    Method methodOb = null;
+    try {
+      methodOb = MRClientProtocol.class.getMethod(method, argClass);
+    } catch (SecurityException e) {
+      throw new YarnException(e);
+    } catch (NoSuchMethodException e) {
+      throw new YarnException("Method name mismatch", e);
+    }
+    int maxRetries = this.conf.getInt(
+        MRJobConfig.MR_CLIENT_MAX_RETRIES,
+        MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+    IOException lastException = null;
+    while (maxRetries > 0) {
+      try {
+        return methodOb.invoke(getProxy(), args);
+      } catch (YarnRemoteException yre) {
+        LOG.warn("Exception thrown by remote end.", yre);
+        throw yre;
+      } catch (InvocationTargetException e) {
+        if (e.getTargetException() instanceof YarnRemoteException) {
+          LOG.warn("Error from remote end: " + e
+              .getTargetException().getLocalizedMessage());
+          LOG.debug("Tracing remote error ", e.getTargetException());
+          throw (YarnRemoteException) e.getTargetException();
+        }
+        LOG.debug("Failed to contact AM/History for job " + jobId + 
+            " retrying..", e.getTargetException());
+        // Force reconnection by setting the proxy to null.
+        realProxy = null;
+        // HS/AMS shut down
+        maxRetries--;
+        lastException = new IOException(e.getMessage());
+        
+      } catch (Exception e) {
+        LOG.debug("Failed to contact AM/History for job " + jobId
+            + "  Will retry..", e);
+        // Force reconnection by setting the proxy to null.
+        realProxy = null;
+        // RM shutdown
+        maxRetries--;
+        lastException = new IOException(e.getMessage());     
+      }
+    }
+    throw lastException;
+  }
+
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+  InterruptedException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
+      GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
+      request.setJobId(jobID);
+      Counters cnt = ((GetCountersResponse)
+          invoke("getCounters", GetCountersRequest.class, request)).getCounters();
+      return TypeConverter.fromYarn(cnt);
+
+  }
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+      throws IOException, InterruptedException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
+        .toYarn(arg0);
+    GetTaskAttemptCompletionEventsRequest request = recordFactory
+        .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
+    request.setJobId(jobID);
+    request.setFromEventId(arg1);
+    request.setMaxEvents(arg2);
+    List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list =
+      ((GetTaskAttemptCompletionEventsResponse) invoke(
+        "getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)).
+        getCompletionEventList();
+    return TypeConverter
+        .fromYarn(list
+            .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
+  }
+
+  public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+      throws IOException, InterruptedException {
+
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
+        .toYarn(arg0);
+    GetDiagnosticsRequest request = recordFactory
+        .newRecordInstance(GetDiagnosticsRequest.class);
+    request.setTaskAttemptId(attemptID);
+    List<String> list = ((GetDiagnosticsResponse) invoke("getDiagnostics",
+        GetDiagnosticsRequest.class, request)).getDiagnosticsList();
+    String[] result = new String[list.size()];
+    int i = 0;
+    for (String c : list) {
+      result[i++] = c.toString();
+    }
+    return result;
+  }
+  
+  public JobStatus getJobStatus(JobID oldJobID) throws IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+      TypeConverter.toYarn(oldJobID);
+    GetJobReportRequest request =
+        recordFactory.newRecordInstance(GetJobReportRequest.class);
+    request.setJobId(jobId);
+    JobReport report = ((GetJobReportResponse) invoke("getJobReport",
+        GetJobReportRequest.class, request)).getJobReport();
+    JobStatus jobStatus = null;
+    if (report != null) {
+      if (StringUtils.isEmpty(report.getJobFile())) {
+        String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
+        report.setJobFile(jobFile);
+      }
+      String historyTrackingUrl = report.getTrackingUrl();
+      String url = StringUtils.isNotEmpty(historyTrackingUrl)
+          ? historyTrackingUrl : trackingUrl;
+      if (!UNAVAILABLE.equals(url)) {
+        url = HttpConfig.getSchemePrefix() + url;
+      }
+      jobStatus = TypeConverter.fromYarn(report, url);
+    }
+    return jobStatus;
+  }
+
+  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+       throws IOException{
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+      TypeConverter.toYarn(oldJobID);
+    GetTaskReportsRequest request =
+        recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+    request.setJobId(jobId);
+    request.setTaskType(TypeConverter.toYarn(taskType));
+
+    List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports =
+      ((GetTaskReportsResponse) invoke("getTaskReports", GetTaskReportsRequest.class,
+          request)).getTaskReportList();
+
+    return TypeConverter.fromYarn
+    (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
+  }
+
+  public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+       throws IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
+      = TypeConverter.toYarn(taskAttemptID);
+    if (fail) {
+      FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
+      failRequest.setTaskAttemptId(attemptID);
+      invoke("failTaskAttempt", FailTaskAttemptRequest.class, failRequest);
+    } else {
+      KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
+      killRequest.setTaskAttemptId(attemptID);
+      invoke("killTaskAttempt", KillTaskAttemptRequest.class, killRequest);
+    }
+    return true;
+  }
+
+  public boolean killJob(JobID oldJobID)
+       throws IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
+    = TypeConverter.toYarn(oldJobID);
+    KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
+    killRequest.setJobId(jobId);
+    invoke("killJob", KillJobRequest.class, killRequest);
+    return true;
+  }
+
+  public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
+      throws YarnRemoteException, IOException {
+    org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+        TypeConverter.toYarn(oldJobID);
+    GetJobReportRequest request =
+        recordFactory.newRecordInstance(GetJobReportRequest.class);
+    request.setJobId(jobId);
+
+    JobReport report =
+        ((GetJobReportResponse) invoke("getJobReport",
+            GetJobReportRequest.class, request)).getJobReport();
+    if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
+        JobState.ERROR).contains(report.getJobState())) {
+      if (oldTaskAttemptID != null) {
+        GetTaskAttemptReportRequest taRequest =
+            recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
+        taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
+        TaskAttemptReport taReport =
+            ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
+                GetTaskAttemptReportRequest.class, taRequest))
+                .getTaskAttemptReport();
+        if (taReport.getContainerId() == null
+            || taReport.getNodeManagerHost() == null) {
+          throw new IOException("Unable to get log information for task: "
+              + oldTaskAttemptID);
+        }
+        return new LogParams(
+            taReport.getContainerId().toString(),
+            taReport.getContainerId().getApplicationAttemptId()
+                .getApplicationId().toString(),
+            BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
+                taReport.getNodeManagerPort()).toString(), report.getUser());
+      } else {
+        if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
+          throw new IOException("Unable to get log information for job: "
+              + oldJobID);
+        }
+        AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
+        return new LogParams(
+            amInfo.getContainerId().toString(),
+            amInfo.getAppAttemptId().getApplicationId().toString(),
+            BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
+                amInfo.getNodeManagerPort()).toString(), report.getUser());
+      }
+    } else {
+      throw new IOException("Cannot get log path for a in-progress job");
+    }
+  }
+}

Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,241 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+public class NotRunningJob implements MRClientProtocol {
+
+  private RecordFactory recordFactory =
+    RecordFactoryProvider.getRecordFactory(null);
+
+  private final JobState jobState;
+  private final ApplicationReport applicationReport;
+
+
+  private ApplicationReport getUnknownApplicationReport() {
+    ApplicationId unknownAppId = recordFactory
+        .newRecordInstance(ApplicationId.class);
+    ApplicationAttemptId unknownAttemptId = recordFactory
+        .newRecordInstance(ApplicationAttemptId.class);
+
+    // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
+    // used for a non running job
+    return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
+        "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
+        "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
+  }
+
+  NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
+    this.applicationReport =
+        (applicationReport ==  null) ?
+            getUnknownApplicationReport() : applicationReport;
+    this.jobState = jobState;
+  }
+
+  @Override
+  public FailTaskAttemptResponse failTaskAttempt(
+      FailTaskAttemptRequest request) throws YarnRemoteException {
+    FailTaskAttemptResponse resp =
+      recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
+    return resp;
+  }
+
+  @Override
+  public GetCountersResponse getCounters(GetCountersRequest request)
+      throws YarnRemoteException {
+    GetCountersResponse resp =
+      recordFactory.newRecordInstance(GetCountersResponse.class);
+    Counters counters = recordFactory.newRecordInstance(Counters.class);
+    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+    resp.setCounters(counters);
+    return resp;
+  }
+
+  @Override
+  public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
+      throws YarnRemoteException {
+    GetDiagnosticsResponse resp =
+      recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+    resp.addDiagnostics("");
+    return resp;
+  }
+
+  @Override
+  public GetJobReportResponse getJobReport(GetJobReportRequest request)
+      throws YarnRemoteException {
+    JobReport jobReport =
+      recordFactory.newRecordInstance(JobReport.class);
+    jobReport.setJobId(request.getJobId());
+    jobReport.setJobState(jobState);
+    jobReport.setUser(applicationReport.getUser());
+    jobReport.setStartTime(applicationReport.getStartTime());
+    jobReport.setDiagnostics(applicationReport.getDiagnostics());
+    jobReport.setJobName(applicationReport.getName());
+    jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
+    jobReport.setFinishTime(applicationReport.getFinishTime());
+
+    GetJobReportResponse resp =
+        recordFactory.newRecordInstance(GetJobReportResponse.class);
+    resp.setJobReport(jobReport);
+    return resp;
+  }
+
+  @Override
+  public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+      GetTaskAttemptCompletionEventsRequest request)
+      throws YarnRemoteException {
+    GetTaskAttemptCompletionEventsResponse resp =
+      recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+    resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
+    return resp;
+  }
+
+  @Override
+  public GetTaskAttemptReportResponse getTaskAttemptReport(
+      GetTaskAttemptReportRequest request) throws YarnRemoteException {
+    //not invoked by anybody
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
+      throws YarnRemoteException {
+    GetTaskReportResponse resp =
+      recordFactory.newRecordInstance(GetTaskReportResponse.class);
+    TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
+    report.setTaskId(request.getTaskId());
+    report.setTaskState(TaskState.NEW);
+    Counters counters = recordFactory.newRecordInstance(Counters.class);
+    counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+    report.setCounters(counters);
+    report.addAllRunningAttempts(new ArrayList<TaskAttemptId>());
+    return resp;
+  }
+
+  @Override
+  public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
+      throws YarnRemoteException {
+    GetTaskReportsResponse resp =
+      recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+    resp.addAllTaskReports(new ArrayList<TaskReport>());
+    return resp;
+  }
+
+  @Override
+  public KillJobResponse killJob(KillJobRequest request)
+      throws YarnRemoteException {
+    KillJobResponse resp =
+      recordFactory.newRecordInstance(KillJobResponse.class);
+    return resp;
+  }
+
+  @Override
+  public KillTaskResponse killTask(KillTaskRequest request)
+      throws YarnRemoteException {
+    KillTaskResponse resp =
+      recordFactory.newRecordInstance(KillTaskResponse.class);
+    return resp;
+  }
+
+  @Override
+  public KillTaskAttemptResponse killTaskAttempt(
+      KillTaskAttemptRequest request) throws YarnRemoteException {
+    KillTaskAttemptResponse resp =
+      recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
+    return resp;
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnRemoteException {
+    /* Should not be invoked by anyone. */
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnRemoteException {
+    /* Should not be invoked by anyone. */
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnRemoteException {
+    /* Should not be invoked by anyone. */
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public InetSocketAddress getConnectAddress() {
+    /* Should not be invoked by anyone.  Normally used to set token service */
+    throw new NotImplementedException();
+  }
+}

Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,168 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+
+public class ResourceMgrDelegate extends YarnClientImpl {
+  private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+      
+  private YarnConfiguration conf;
+  private GetNewApplicationResponse application;
+  private ApplicationId applicationId;
+
+  /**
+   * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
+   * @param conf the configuration object.
+   */
+  public ResourceMgrDelegate(YarnConfiguration conf) {
+    super();
+    this.conf = conf;
+    init(conf);
+    start();
+  }
+
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+    return TypeConverter.fromYarnNodes(super.getNodeReports());
+  }
+
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+    return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
+  }
+
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    // TODO: Implement getBlacklistedTrackers
+    LOG.warn("getBlacklistedTrackers - Not implemented yet");
+    return new TaskTrackerInfo[0];
+  }
+
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {
+    YarnClusterMetrics metrics = super.getYarnClusterMetrics();
+    ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
+        metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
+        metrics.getNumNodeManagers(), 0, 0);
+    return oldMetrics;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Token getDelegationToken(Text renewer) throws IOException,
+      InterruptedException {
+    return ProtoUtils.convertFromProtoFormat(
+      super.getRMDelegationToken(renewer), rmAddress);
+  }
+
+  public String getFilesystemName() throws IOException, InterruptedException {
+    return FileSystem.get(conf).getUri().toString();
+  }
+
+  public JobID getNewJobID() throws IOException, InterruptedException {
+    this.application = super.getNewApplication();
+    this.applicationId = this.application.getApplicationId();
+    return TypeConverter.fromYarn(applicationId);
+  }
+
+  public QueueInfo getQueue(String queueName) throws IOException,
+  InterruptedException {
+    return TypeConverter.fromYarn(
+        super.getQueueInfo(queueName), this.conf);
+  }
+
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    return TypeConverter.fromYarnQueueUserAclsInfo(super
+      .getQueueAclsInfo());
+  }
+
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
+  }
+
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf);
+  }
+
+  public QueueInfo[] getChildQueues(String parent) throws IOException,
+      InterruptedException {
+    return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+      this.conf);
+  }
+
+  public String getStagingAreaDir() throws IOException, InterruptedException {
+//    Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+    String user = 
+      UserGroupInformation.getCurrentUser().getShortUserName();
+    Path path = MRApps.getStagingAreaDir(conf, user);
+    LOG.debug("getStagingAreaDir: dir=" + path);
+    return path.toString();
+  }
+
+
+  public String getSystemDir() throws IOException, InterruptedException {
+    Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
+    //FileContext.getFileContext(conf).delete(sysDir, true);
+    return sysDir.toString();
+  }
+  
+
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return 0;
+  }
+  
+  public void setJobPriority(JobID arg0, String arg1) throws IOException,
+      InterruptedException {
+    return;
+  }
+
+
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return 0;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+}

Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,622 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Master;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class enables the current JobClient (0.22 hadoop) to run on YARN.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class YARNRunner implements ClientProtocol {
+
+  private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private ResourceMgrDelegate resMgrDelegate;
+  private ClientCache clientCache;
+  private Configuration conf;
+  private final FileContext defaultFileContext;
+  
+  /* usually is false unless the jobclient get delegation token is 
+   *  called. This is a hack wherein we do return a token from RM 
+   *  on getDelegationtoken but due to the restricted api on jobclient
+   *  we just add a job history DT token when submitting a job.
+   */
+  private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED = 
+      false;
+  
+  /**
+   * Yarn runner incapsulates the client interface of
+   * yarn
+   * @param conf the configuration object for the client
+   */
+  public YARNRunner(Configuration conf) {
+   this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
+  }
+
+  /**
+   * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
+   * {@link ResourceMgrDelegate}. Enables mocking and testing.
+   * @param conf the configuration object for the client
+   * @param resMgrDelegate the resourcemanager client handle.
+   */
+  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+   this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+  }
+
+  /**
+   * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
+   * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+   * @param conf the configuration object
+   * @param resMgrDelegate the resource manager delegate
+   * @param clientCache the client cache object.
+   */
+  public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+      ClientCache clientCache) {
+    this.conf = conf;
+    try {
+      this.resMgrDelegate = resMgrDelegate;
+      this.clientCache = clientCache;
+      this.defaultFileContext = FileContext.getFileContext(this.conf);
+    } catch (UnsupportedFileSystemException ufe) {
+      throw new RuntimeException("Error in instantiating YarnClient", ufe);
+    }
+  }
+  
+  @Private
+  /**
+   * Used for testing mostly.
+   * @param resMgrDelegate the resource manager delegate to set to.
+   */
+  public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
+    this.resMgrDelegate = resMgrDelegate;
+  }
+  
+  @Override
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Use Token.renew instead");
+  }
+
+  @Override
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getActiveTrackers();
+  }
+
+  @Override
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+    return resMgrDelegate.getAllJobs();
+  }
+
+  @Override
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getBlacklistedTrackers();
+  }
+
+  @Override
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getClusterMetrics();
+  }
+
+  @VisibleForTesting
+  Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
+      throws IOException, InterruptedException {
+    GetDelegationTokenRequest request = recordFactory
+      .newRecordInstance(GetDelegationTokenRequest.class);
+    request.setRenewer(Master.getMasterPrincipal(conf));
+    DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request)
+      .getDelegationToken();
+    return ProtoUtils.convertFromProtoFormat(mrDelegationToken,
+                                             hsProxy.getConnectAddress());
+  }
+
+  @Override
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+      throws IOException, InterruptedException {
+    // The token is only used for serialization. So the type information
+    // mismatch should be fine.
+    return resMgrDelegate.getDelegationToken(renewer);
+  }
+
+  @Override
+  public String getFilesystemName() throws IOException, InterruptedException {
+    return resMgrDelegate.getFilesystemName();
+  }
+
+  @Override
+  public JobID getNewJobID() throws IOException, InterruptedException {
+    return resMgrDelegate.getNewJobID();
+  }
+
+  @Override
+  public QueueInfo getQueue(String queueName) throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getQueue(queueName);
+  }
+
+  @Override
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getQueueAclsForCurrentUser();
+  }
+
+  @Override
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    return resMgrDelegate.getQueues();
+  }
+
+  @Override
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    return resMgrDelegate.getRootQueues();
+  }
+
+  @Override
+  public QueueInfo[] getChildQueues(String parent) throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getChildQueues(parent);
+  }
+
+  @Override
+  public String getStagingAreaDir() throws IOException, InterruptedException {
+    return resMgrDelegate.getStagingAreaDir();
+  }
+
+  @Override
+  public String getSystemDir() throws IOException, InterruptedException {
+    return resMgrDelegate.getSystemDir();
+  }
+
+  @Override
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getTaskTrackerExpiryInterval();
+  }
+
+  @Override
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+  throws IOException, InterruptedException {
+    
+    /* check if we have a hsproxy, if not, no need */
+    MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
+    if (hsProxy != null) {
+      // JobClient will set this flag if getDelegationToken is called, if so, get
+      // the delegation tokens for the HistoryServer also.
+
+      // TODO Fix this. Temporarily dumping this into TezConfig
+      if (conf.getBoolean(MRJobConfig.HS_DELEGATION_TOKEN_REQUIRED,
+          DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
+        Token hsDT = getDelegationTokenFromHS(hsProxy);
+        ts.addToken(hsDT.getService(), hsDT);
+      }
+    }
+
+    // Upload only in security mode: TODO
+    Path applicationTokensFile =
+        new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+    try {
+      ts.writeTokenStorageFile(applicationTokensFile, conf);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    // Construct necessary information to start the MR AM
+    ApplicationSubmissionContext appContext =
+      createApplicationSubmissionContext(conf, jobSubmitDir, ts);
+
+    // Submit to ResourceManager
+    ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
+
+    ApplicationReport appMaster = resMgrDelegate
+        .getApplicationReport(applicationId);
+    String diagnostics =
+        (appMaster == null ?
+            "application report is null" : appMaster.getDiagnostics());
+    if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
+        || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
+      throw new IOException("Failed to run job : " +
+        diagnostics);
+    }
+    return clientCache.getClient(jobId).getJobStatus(jobId);
+  }
+
+  private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
+      throws IOException {
+    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+    FileStatus rsrcStat = fs.getFileStatus(p);
+    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+    rsrc.setSize(rsrcStat.getLen());
+    rsrc.setTimestamp(rsrcStat.getModificationTime());
+    rsrc.setType(type);
+    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    return rsrc;
+  }
+
+  public ApplicationSubmissionContext createApplicationSubmissionContext(
+      Configuration jobConf,
+      String jobSubmitDir, Credentials ts) throws IOException {
+    ApplicationId applicationId = resMgrDelegate.getApplicationId();
+
+    // Setup resource requirements
+    Resource capability = recordFactory.newRecordInstance(Resource.class);
+    capability.setMemory(
+        conf.getInt(
+            MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
+            )
+        );
+    capability.setVirtualCores(
+        conf.getInt(
+            MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+            )
+        );
+    LOG.debug("AppMaster capability = " + capability);
+
+    // Setup LocalResources
+    Map<String, LocalResource> localResources =
+        new HashMap<String, LocalResource>();
+
+    Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+    URL yarnUrlForJobSubmitDir = ConverterUtils
+        .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
+            .resolvePath(
+                defaultFileContext.makeQualified(new Path(jobSubmitDir))));
+    LOG.debug("Creating setup context, jobSubmitDir url is "
+        + yarnUrlForJobSubmitDir);
+
+    localResources.put(MRJobConfig.JOB_CONF_FILE,
+        createApplicationResource(defaultFileContext,
+            jobConfPath, LocalResourceType.FILE));
+    if (jobConf.get(MRJobConfig.JAR) != null) {
+      Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+      LocalResource rc = createApplicationResource(defaultFileContext,
+          jobJarPath, 
+          LocalResourceType.PATTERN);
+      String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, 
+          JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+      rc.setPattern(pattern);
+      localResources.put(MRJobConfig.JOB_JAR, rc);
+    } else {
+      // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+      // mapreduce jar itself which is already on the classpath.
+      LOG.info("Job jar is not present. "
+          + "Not adding any jar to the list of resources.");
+    }
+
+    // TODO gross hack
+    for (String s : new String[] {
+        MRJobConfig.JOB_SPLIT,
+        MRJobConfig.JOB_SPLIT_METAINFO,
+        MRJobConfig.APPLICATION_TOKENS_FILE }) {
+      localResources.put(
+          MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
+          createApplicationResource(defaultFileContext,
+              new Path(jobSubmitDir, s), LocalResourceType.FILE));
+    }
+
+    // Setup security tokens
+    DataOutputBuffer dob = new DataOutputBuffer();
+    ts.writeTokenStorageToStream(dob);
+    ByteBuffer securityTokens  = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    // Setup the command to run the AM
+    List<String> vargs = new ArrayList<String>(8);
+    vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+    // TODO: why do we use 'conf' some places and 'jobConf' others?
+    long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
+    String logLevel = jobConf.get(
+        MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
+    MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
+
+    // Check for Java Lib Path usage in MAP and REDUCE configs
+    warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map", 
+        MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map", 
+        MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce", 
+        MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
+    warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce", 
+        MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);   
+
+    // Add AM admin command opts before user command opts
+    // so that it can be overridden by user
+    String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+    warnForJavaLibPath(mrAppMasterAdminOptions, "app master", 
+        MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
+    vargs.add(mrAppMasterAdminOptions);
+    
+    // Add AM user command opts
+    String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+        MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+    warnForJavaLibPath(mrAppMasterUserOptions, "app master", 
+        MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
+    vargs.add(mrAppMasterUserOptions);
+    
+    vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        Path.SEPARATOR + ApplicationConstants.STDOUT);
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+        Path.SEPARATOR + ApplicationConstants.STDERR);
+
+
+    Vector<String> vargsFinal = new Vector<String>(8);
+    // Final command
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : vargs) {
+      mergedCommand.append(str).append(" ");
+    }
+    vargsFinal.add(mergedCommand.toString());
+
+    LOG.debug("Command to launch container for ApplicationMaster is : "
+        + mergedCommand);
+
+    // Setup the CLASSPATH in environment
+    // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+    Map<String, String> environment = new HashMap<String, String>();
+    MRApps.setClasspath(environment, conf);
+    
+    // Setup the environment variables for Admin first
+    MRApps.setEnvFromInputString(environment, 
+        conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
+    // Setup the environment variables (LD_LIBRARY_PATH, etc)
+    MRApps.setEnvFromInputString(environment, 
+        conf.get(MRJobConfig.MR_AM_ENV));
+
+    // Parse distributed cache
+    MRApps.setupDistributedCache(jobConf, localResources);
+
+    Map<ApplicationAccessType, String> acls
+        = new HashMap<ApplicationAccessType, String>(2);
+    acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
+        MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+    acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
+        MRJobConfig.JOB_ACL_MODIFY_JOB,
+        MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+
+    // Setup ContainerLaunchContext for AM container
+    ContainerLaunchContext amContainer = BuilderUtils
+        .newContainerLaunchContext(null, UserGroupInformation
+            .getCurrentUser().getShortUserName(), capability, localResources,
+            environment, vargsFinal, null, securityTokens, acls);
+
+    // Set up the ApplicationSubmissionContext
+    ApplicationSubmissionContext appContext =
+        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    appContext.setApplicationId(applicationId);                // ApplicationId
+    appContext.setUser(                                        // User name
+        UserGroupInformation.getCurrentUser().getShortUserName());
+    appContext.setQueue(                                       // Queue name
+        jobConf.get(JobContext.QUEUE_NAME,
+        YarnConfiguration.DEFAULT_QUEUE_NAME));
+    appContext.setApplicationName(                             // Job name
+        jobConf.get(JobContext.JOB_NAME,
+        YarnConfiguration.DEFAULT_APPLICATION_NAME));
+    appContext.setCancelTokensWhenComplete(
+        conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
+    appContext.setAMContainerSpec(amContainer);         // AM Container
+
+    return appContext;
+  }
+
+  @Override
+  public void setJobPriority(JobID arg0, String arg1) throws IOException,
+      InterruptedException {
+    resMgrDelegate.setJobPriority(arg0, arg1);
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return resMgrDelegate.getProtocolVersion(arg0, arg1);
+  }
+
+  @Override
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Use Token.renew instead");
+  }
+
+
+  @Override
+  public Counters getJobCounters(JobID arg0) throws IOException,
+      InterruptedException {
+    return clientCache.getClient(arg0).getJobCounters(arg0);
+  }
+
+  @Override
+  public String getJobHistoryDir() throws IOException, InterruptedException {
+    return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+  }
+
+  @Override
+  public JobStatus getJobStatus(JobID jobID) throws IOException,
+      InterruptedException {
+    JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);
+    return status;
+  }
+
+  @Override
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+      int arg2) throws IOException, InterruptedException {
+    return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
+  }
+
+  @Override
+  public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
+      InterruptedException {
+    return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
+  }
+
+  @Override
+  public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+  throws IOException, InterruptedException {
+    return clientCache.getClient(jobID)
+        .getTaskReports(jobID, taskType);
+  }
+
+  @Override
+  public void killJob(JobID arg0) throws IOException, InterruptedException {
+    /* check if the status is not running, if not send kill to RM */
+    JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
+    if (status.getState() != JobStatus.State.RUNNING) {
+      resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+      return;
+    }
+
+    try {
+      /* send a kill to the AM */
+      clientCache.getClient(arg0).killJob(arg0);
+      long currentTimeMillis = System.currentTimeMillis();
+      long timeKillIssued = currentTimeMillis;
+      while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
+          != JobStatus.State.KILLED)) {
+          try {
+            Thread.sleep(1000L);
+          } catch(InterruptedException ie) {
+            /** interrupted, just break */
+            break;
+          }
+          currentTimeMillis = System.currentTimeMillis();
+          status = clientCache.getClient(arg0).getJobStatus(arg0);
+      }
+    } catch(IOException io) {
+      LOG.debug("Error when checking for application status", io);
+    }
+    if (status.getState() != JobStatus.State.KILLED) {
+      resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+    }
+  }
+
+  @Override
+  public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
+      InterruptedException {
+    return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
+  }
+
+  @Override
+  public AccessControlList getQueueAdmins(String arg0) throws IOException {
+    return new AccessControlList("*");
+  }
+
+  @Override
+  public JobTrackerStatus getJobTrackerStatus() throws IOException,
+      InterruptedException {
+    return JobTrackerStatus.RUNNING;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
+        clientMethodsHash);
+  }
+
+  @Override
+  public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+      throws IOException {
+    return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
+  }
+
+  private static void warnForJavaLibPath(String opts, String component, 
+      String javaConf, String envConf) {
+    if (opts != null && opts.contains("-Djava.library.path")) {
+      LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
+               "programs to no longer function if hadoop native libraries " +
+               "are used. These values should be set as part of the " +
+               "LD_LIBRARY_PATH in the " + component + " JVM env using " +
+               envConf + " config settings.");
+    }
+  }
+}

Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+
+public class YarnTezClientProtocolProvider extends ClientProtocolProvider {
+
+  @Override
+  public ClientProtocol create(Configuration conf) throws IOException {
+    if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
+      return new YARNRunner(conf);
+    }
+    return null;
+  }
+
+  @Override
+  public ClientProtocol create(InetSocketAddress addr, Configuration conf)
+      throws IOException {
+    return create(conf);
+  }
+
+  @Override
+  public void close(ClientProtocol clientProtocol) throws IOException {
+    // nothing to do
+  }
+
+}

Added: incubator/tez/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider (added)
+++ incubator/tez/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider Fri Mar 15 21:26:36 2013
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.tez.mapreduce.YarnTezClientProtocolProvider