You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [38/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Added: incubator/tez/tez-yarn-client/.classpath
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/.classpath?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/.classpath (added)
+++ incubator/tez/tez-yarn-client/.classpath Fri Mar 15 21:26:36 2013
@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<classpath>
+ <classpathentry including="**/*.java" kind="src" path="src/main/java"/>
+ <classpathentry excluding="**/*.java" kind="src" path="src/main/resources"/>
+ <classpathentry kind="var" path="M2_REPO/javax/activation/activation/1.1/activation-1.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/javax/inject/javax.inject/1/javax.inject-1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar"/>
+ <classpathentry kind="var" path="M2_REPO/javax/servlet/jsp/jsp-api/2.1/jsp-api-2.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar"/>
+ <classpathentry kind="var" path="M2_REPO/aopalliance/aopalliance/1.0/aopalliance-1.0.jar"/>
+ <classpathentry kind="var" path="M2_REPO/asm/asm/3.1/asm-3.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/apache/avro/avro/1.5.3/avro-1.5.3.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-beanutils/commons-beanutils/1.7.0/commons-beanutils-1.7.0.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-beanutils/commons-beanutils-core/1.8.0/commons-beanutils-core-1.8.0.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-cli/commons-cli/1.2/commons-cli-1.2.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-codec/commons-codec/1.4/commons-codec-1.4.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/apache/commons/commons-compress/1.4/commons-compress-1.4.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-configuration/commons-configuration/1.6/commons-configuration-1.6.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-digester/commons-digester/1.8/commons-digester-1.8.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-el/commons-el/1.0/commons-el-1.0.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-httpclient/commons-httpclient/3.1/commons-httpclient-3.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-io/commons-io/2.1/commons-io-2.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-lang/commons-lang/2.5/commons-lang-2.5.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/apache/commons/commons-math/2.1/commons-math-2.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/commons-net/commons-net/3.1/commons-net-3.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/google/guava/guava/11.0.2/guava-11.0.2.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/google/inject/guice/3.0/guice-3.0.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/google/inject/extensions/guice-assistedinject/3.0/guice-assistedinject-3.0.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-annotations/3.0.0-SNAPSHOT/hadoop-annotations-3.0.0-SNAPSHOT.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-auth/3.0.0-SNAPSHOT/hadoop-auth-3.0.0-SNAPSHOT.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT-sources.jar">
+ <attributes>
+ <attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-common/3.0.0-SNAPSHOT/hadoop-common-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-common/3.0.0-SNAPSHOT/hadoop-mapreduce-client-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-common/3.0.0-SNAPSHOT/hadoop-mapreduce-client-common-3.0.0-SNAPSHOT-sources.jar">
+ <attributes>
+ <attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-common/3.0.0-SNAPSHOT/hadoop-mapreduce-client-common-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-core/3.0.0-SNAPSHOT/hadoop-mapreduce-client-core-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-core/3.0.0-SNAPSHOT/hadoop-mapreduce-client-core-3.0.0-SNAPSHOT-sources.jar">
+ <attributes>
+ <attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-core/3.0.0-SNAPSHOT/hadoop-mapreduce-client-core-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-shuffle/3.0.0-SNAPSHOT/hadoop-mapreduce-client-shuffle-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-mapreduce-client-shuffle/3.0.0-SNAPSHOT/hadoop-mapreduce-client-shuffle-3.0.0-SNAPSHOT-sources.jar">
+ <attributes>
+ <attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-mapreduce-client-shuffle/3.0.0-SNAPSHOT/hadoop-mapreduce-client-shuffle-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT-sources.jar">
+ <attributes>
+ <attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-api/3.0.0-SNAPSHOT/hadoop-yarn-api-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT-sources.jar">
+ <attributes>
+ <attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-common/3.0.0-SNAPSHOT/hadoop-yarn-common-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="var" path="M2_REPO/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT.jar" sourcepath="M2_REPO/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT-sources.jar">
+ <attributes>
+ <attribute name="javadoc_location" value="jar:file:/Users/acmurthy/.m2/repository/org/apache/hadoop/hadoop-yarn-server-nodemanager/3.0.0-SNAPSHOT/hadoop-yarn-server-nodemanager-3.0.0-SNAPSHOT-javadoc.jar!/"/>
+ </attributes>
+ </classpathentry>
+ <classpathentry kind="var" path="M2_REPO/org/hamcrest/hamcrest-core/1.3/hamcrest-core-1.3.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-jaxrs/1.7.1/jackson-jaxrs-1.7.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/codehaus/jackson/jackson-xc/1.7.1/jackson-xc-1.7.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/tomcat/jasper-compiler/5.5.23/jasper-compiler-5.5.23.jar"/>
+ <classpathentry kind="var" path="M2_REPO/tomcat/jasper-runtime/5.5.23/jasper-runtime-5.5.23.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/sun/xml/bind/jaxb-impl/2.2.3-1/jaxb-impl-2.2.3-1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-core/1.8/jersey-core-1.8.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-json/1.8/jersey-json-1.8.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/sun/jersey/jersey-server/1.8/jersey-server-1.8.jar"/>
+ <classpathentry kind="var" path="M2_REPO/net/java/dev/jets3t/jets3t/0.6.1/jets3t-0.6.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/codehaus/jettison/jettison/1.1/jettison-1.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar"/>
+ <classpathentry kind="var" path="M2_REPO/jline/jline/0.9.94/jline-0.9.94.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar"/>
+ <classpathentry kind="var" path="M2_REPO/junit/junit/4.11/junit-4.11.jar"/>
+ <classpathentry kind="var" path="M2_REPO/log4j/log4j/1.2.17/log4j-1.2.17.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/jboss/netty/netty/3.2.2.Final/netty-3.2.2.Final.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar"/>
+ <classpathentry kind="var" path="M2_REPO/com/google/protobuf/protobuf-java/2.4.0a/protobuf-java-2.4.0a.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/xerial/snappy/snappy-java/1.0.3.2/snappy-java-1.0.3.2.jar"/>
+ <classpathentry kind="var" path="M2_REPO/stax/stax-api/1.0.1/stax-api-1.0.1.jar"/>
+ <classpathentry kind="src" path="/tez-api"/>
+ <classpathentry kind="src" path="/tez-common"/>
+ <classpathentry kind="src" path="/tez-engine"/>
+ <classpathentry kind="src" path="/tez-mapreduce"/>
+ <classpathentry kind="var" path="M2_REPO/xmlenc/xmlenc/0.52/xmlenc-0.52.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/tukaani/xz/1.0/xz-1.0.jar"/>
+ <classpathentry kind="var" path="M2_REPO/org/apache/zookeeper/zookeeper/3.4.2/zookeeper-3.4.2.jar"/>
+ <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
+ <classpathentry kind="src" path="/hadoop-yarn-client"/>
+ <classpathentry kind="output" path="target/classes"/>
+</classpath>
Added: incubator/tez/tez-yarn-client/.project
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/.project?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/.project (added)
+++ incubator/tez/tez-yarn-client/.project Fri Mar 15 21:26:36 2013
@@ -0,0 +1,18 @@
+<projectDescription>
+ <name>tez-yarn-client</name>
+ <comment>NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.</comment>
+ <projects>
+ <project>tez-api</project>
+ <project>tez-common</project>
+ <project>tez-engine</project>
+ <project>tez-mapreduce</project>
+ </projects>
+ <buildSpec>
+ <buildCommand>
+ <name>org.eclipse.jdt.core.javabuilder</name>
+ </buildCommand>
+ </buildSpec>
+ <natures>
+ <nature>org.eclipse.jdt.core.javanature</nature>
+ </natures>
+</projectDescription>
\ No newline at end of file
Added: incubator/tez/tez-yarn-client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/pom.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/pom.xml (added)
+++ incubator/tez/tez-yarn-client/pom.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez</artifactId>
+ <version>0.1.0</version>
+ </parent>
+ <artifactId>tez-yarn-client</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-mapreduce</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientCache.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+
+public class ClientCache {
+
+ private final Configuration conf;
+ private final ResourceMgrDelegate rm;
+
+ private static final Log LOG = LogFactory.getLog(ClientCache.class);
+
+ private Map<JobID, ClientServiceDelegate> cache =
+ new HashMap<JobID, ClientServiceDelegate>();
+
+ private MRClientProtocol hsProxy;
+
+ public ClientCache(Configuration conf, ResourceMgrDelegate rm) {
+ this.conf = conf;
+ this.rm = rm;
+ }
+
+ //TODO: evict from the cache on some threshold
+ public synchronized ClientServiceDelegate getClient(JobID jobId) {
+ if (hsProxy == null) {
+ try {
+ hsProxy = instantiateHistoryProxy();
+ } catch (IOException e) {
+ LOG.warn("Could not connect to History server.", e);
+ throw new YarnException("Could not connect to History server.", e);
+ }
+ }
+ ClientServiceDelegate client = cache.get(jobId);
+ if (client == null) {
+ client = new ClientServiceDelegate(conf, rm, jobId, hsProxy);
+ cache.put(jobId, client);
+ }
+ return client;
+ }
+
+ protected synchronized MRClientProtocol getInitializedHSProxy()
+ throws IOException {
+ if (this.hsProxy == null) {
+ hsProxy = instantiateHistoryProxy();
+ }
+ return this.hsProxy;
+ }
+
+ protected MRClientProtocol instantiateHistoryProxy()
+ throws IOException {
+ final String serviceAddr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS);
+ if (StringUtils.isEmpty(serviceAddr)) {
+ return null;
+ }
+ LOG.debug("Connecting to HistoryServer at: " + serviceAddr);
+ final YarnRPC rpc = YarnRPC.create(conf);
+ LOG.debug("Connected to HistoryServer at: " + serviceAddr);
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+ @Override
+ public MRClientProtocol run() {
+ return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
+ NetUtils.createSocketAddr(serviceAddr), conf);
+ }
+ });
+ }
+}
Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.*;
+import org.apache.hadoop.mapreduce.v2.api.records.*;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ClientToken;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+
+public class ClientServiceDelegate {
+ private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
+ private static final String UNAVAILABLE = "N/A";
+
+ // Caches for per-user NotRunningJobs
+ private HashMap<JobState, HashMap<String, NotRunningJob>> notRunningJobs;
+
+ private final Configuration conf;
+ private final JobID jobId;
+ private final ApplicationId appId;
+ private final ResourceMgrDelegate rm;
+ private final MRClientProtocol historyServerProxy;
+ private MRClientProtocol realProxy = null;
+ private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private static String UNKNOWN_USER = "Unknown User";
+ private String trackingUrl;
+
+ private boolean amAclDisabledStatusLogged = false;
+
+ public ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+ JobID jobId, MRClientProtocol historyServerProxy) {
+ this.conf = new Configuration(conf); // Cloning for modifying.
+ // For faster redirects from AM to HS.
+ this.conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+ this.conf.getInt(MRJobConfig.MR_CLIENT_TO_AM_IPC_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES));
+ this.rm = rm;
+ this.jobId = jobId;
+ this.historyServerProxy = historyServerProxy;
+ this.appId = TypeConverter.toYarn(jobId).getAppId();
+ notRunningJobs = new HashMap<JobState, HashMap<String, NotRunningJob>>();
+ }
+
+ // Get the instance of the NotRunningJob corresponding to the specified
+ // user and state
+ private NotRunningJob getNotRunningJob(ApplicationReport applicationReport,
+ JobState state) {
+ synchronized (notRunningJobs) {
+ HashMap<String, NotRunningJob> map = notRunningJobs.get(state);
+ if (map == null) {
+ map = new HashMap<String, NotRunningJob>();
+ notRunningJobs.put(state, map);
+ }
+ String user =
+ (applicationReport == null) ?
+ UNKNOWN_USER : applicationReport.getUser();
+ NotRunningJob notRunningJob = map.get(user);
+ if (notRunningJob == null) {
+ notRunningJob = new NotRunningJob(applicationReport, state);
+ map.put(user, notRunningJob);
+ }
+ return notRunningJob;
+ }
+ }
+
+ private MRClientProtocol getProxy() throws YarnRemoteException {
+ if (realProxy != null) {
+ return realProxy;
+ }
+
+ // Possibly allow nulls through the PB tunnel, otherwise deal with an exception
+ // and redirect to the history server.
+ ApplicationReport application = rm.getApplicationReport(appId);
+ if (application != null) {
+ trackingUrl = application.getTrackingUrl();
+ }
+ InetSocketAddress serviceAddr = null;
+ while (application == null
+ || YarnApplicationState.RUNNING == application
+ .getYarnApplicationState()) {
+ if (application == null) {
+ LOG.info("Could not get Job info from RM for job " + jobId
+ + ". Redirecting to job history server.");
+ return checkAndGetHSProxy(null, JobState.NEW);
+ }
+ try {
+ if (application.getHost() == null || "".equals(application.getHost())) {
+ LOG.debug("AM not assigned to Job. Waiting to get the AM ...");
+ Thread.sleep(2000);
+
+ LOG.debug("Application state is " + application.getYarnApplicationState());
+ application = rm.getApplicationReport(appId);
+ continue;
+ } else if (UNAVAILABLE.equals(application.getHost())) {
+ if (!amAclDisabledStatusLogged) {
+ LOG.info("Job " + jobId + " is running, but the host is unknown."
+ + " Verify user has VIEW_JOB access.");
+ amAclDisabledStatusLogged = true;
+ }
+ return getNotRunningJob(application, JobState.RUNNING);
+ }
+ if(!conf.getBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED, false)) {
+ UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
+ UserGroupInformation.getCurrentUser().getUserName());
+ serviceAddr = NetUtils.createSocketAddrForHost(
+ application.getHost(), application.getRpcPort());
+ if (UserGroupInformation.isSecurityEnabled()) {
+ ClientToken clientToken = application.getClientToken();
+ Token<ClientTokenIdentifier> token =
+ ProtoUtils.convertFromProtoFormat(clientToken, serviceAddr);
+ newUgi.addToken(token);
+ }
+ LOG.debug("Connecting to " + serviceAddr);
+ final InetSocketAddress finalServiceAddr = serviceAddr;
+ realProxy = newUgi.doAs(new PrivilegedExceptionAction<MRClientProtocol>() {
+ @Override
+ public MRClientProtocol run() throws IOException {
+ return instantiateAMProxy(finalServiceAddr);
+ }
+ });
+ } else {
+ if (!amAclDisabledStatusLogged) {
+ LOG.info("Network ACL closed to AM for job " + jobId
+ + ". Not going to try to reach the AM.");
+ amAclDisabledStatusLogged = true;
+ }
+ return getNotRunningJob(null, JobState.RUNNING);
+ }
+ return realProxy;
+ } catch (IOException e) {
+ //possibly the AM has crashed
+ //there may be some time before AM is restarted
+ //keep retrying by getting the address from RM
+ LOG.info("Could not connect to " + serviceAddr +
+ ". Waiting for getting the latest AM address...");
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e1) {
+ LOG.warn("getProxy() call interruped", e1);
+ throw new YarnException(e1);
+ }
+ application = rm.getApplicationReport(appId);
+ if (application == null) {
+ LOG.info("Could not get Job info from RM for job " + jobId
+ + ". Redirecting to job history server.");
+ return checkAndGetHSProxy(null, JobState.RUNNING);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("getProxy() call interruped", e);
+ throw new YarnException(e);
+ }
+ }
+
+ /** we just want to return if its allocating, so that we don't
+ * block on it. This is to be able to return job status
+ * on an allocating Application.
+ */
+ String user = application.getUser();
+ if (user == null) {
+ throw RPCUtil.getRemoteException("User is not set in the application report");
+ }
+ if (application.getYarnApplicationState() == YarnApplicationState.NEW
+ || application.getYarnApplicationState() == YarnApplicationState.SUBMITTED
+ || application.getYarnApplicationState() == YarnApplicationState.ACCEPTED) {
+ realProxy = null;
+ return getNotRunningJob(application, JobState.NEW);
+ }
+
+ if (application.getYarnApplicationState() == YarnApplicationState.FAILED) {
+ realProxy = null;
+ return getNotRunningJob(application, JobState.FAILED);
+ }
+
+ if (application.getYarnApplicationState() == YarnApplicationState.KILLED) {
+ realProxy = null;
+ return getNotRunningJob(application, JobState.KILLED);
+ }
+
+ //History server can serve a job only if application
+ //succeeded.
+ if (application.getYarnApplicationState() == YarnApplicationState.FINISHED) {
+ LOG.info("Application state is completed. FinalApplicationStatus="
+ + application.getFinalApplicationStatus().toString()
+ + ". Redirecting to job history server");
+ realProxy = checkAndGetHSProxy(application, JobState.SUCCEEDED);
+ }
+ return realProxy;
+ }
+
+ private MRClientProtocol checkAndGetHSProxy(
+ ApplicationReport applicationReport, JobState state) {
+ if (null == historyServerProxy) {
+ LOG.warn("Job History Server is not configured.");
+ return getNotRunningJob(applicationReport, state);
+ }
+ return historyServerProxy;
+ }
+
+ MRClientProtocol instantiateAMProxy(final InetSocketAddress serviceAddr)
+ throws IOException {
+ LOG.trace("Connecting to ApplicationMaster at: " + serviceAddr);
+ YarnRPC rpc = YarnRPC.create(conf);
+ MRClientProtocol proxy =
+ (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+ serviceAddr, conf);
+ LOG.trace("Connected to ApplicationMaster at: " + serviceAddr);
+ return proxy;
+ }
+
+ private synchronized Object invoke(String method, Class argClass,
+ Object args) throws IOException {
+ Method methodOb = null;
+ try {
+ methodOb = MRClientProtocol.class.getMethod(method, argClass);
+ } catch (SecurityException e) {
+ throw new YarnException(e);
+ } catch (NoSuchMethodException e) {
+ throw new YarnException("Method name mismatch", e);
+ }
+ int maxRetries = this.conf.getInt(
+ MRJobConfig.MR_CLIENT_MAX_RETRIES,
+ MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
+ IOException lastException = null;
+ while (maxRetries > 0) {
+ try {
+ return methodOb.invoke(getProxy(), args);
+ } catch (YarnRemoteException yre) {
+ LOG.warn("Exception thrown by remote end.", yre);
+ throw yre;
+ } catch (InvocationTargetException e) {
+ if (e.getTargetException() instanceof YarnRemoteException) {
+ LOG.warn("Error from remote end: " + e
+ .getTargetException().getLocalizedMessage());
+ LOG.debug("Tracing remote error ", e.getTargetException());
+ throw (YarnRemoteException) e.getTargetException();
+ }
+ LOG.debug("Failed to contact AM/History for job " + jobId +
+ " retrying..", e.getTargetException());
+ // Force reconnection by setting the proxy to null.
+ realProxy = null;
+ // HS/AMS shut down
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
+
+ } catch (Exception e) {
+ LOG.debug("Failed to contact AM/History for job " + jobId
+ + " Will retry..", e);
+ // Force reconnection by setting the proxy to null.
+ realProxy = null;
+ // RM shutdown
+ maxRetries--;
+ lastException = new IOException(e.getMessage());
+ }
+ }
+ throw lastException;
+ }
+
+ public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+ InterruptedException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter.toYarn(arg0);
+ GetCountersRequest request = recordFactory.newRecordInstance(GetCountersRequest.class);
+ request.setJobId(jobID);
+ Counters cnt = ((GetCountersResponse)
+ invoke("getCounters", GetCountersRequest.class, request)).getCounters();
+ return TypeConverter.fromYarn(cnt);
+
+ }
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, int arg2)
+ throws IOException, InterruptedException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobID = TypeConverter
+ .toYarn(arg0);
+ GetTaskAttemptCompletionEventsRequest request = recordFactory
+ .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
+ request.setJobId(jobID);
+ request.setFromEventId(arg1);
+ request.setMaxEvents(arg2);
+ List<org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent> list =
+ ((GetTaskAttemptCompletionEventsResponse) invoke(
+ "getTaskAttemptCompletionEvents", GetTaskAttemptCompletionEventsRequest.class, request)).
+ getCompletionEventList();
+ return TypeConverter
+ .fromYarn(list
+ .toArray(new org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent[0]));
+ }
+
+ public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID arg0)
+ throws IOException, InterruptedException {
+
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = TypeConverter
+ .toYarn(arg0);
+ GetDiagnosticsRequest request = recordFactory
+ .newRecordInstance(GetDiagnosticsRequest.class);
+ request.setTaskAttemptId(attemptID);
+ List<String> list = ((GetDiagnosticsResponse) invoke("getDiagnostics",
+ GetDiagnosticsRequest.class, request)).getDiagnosticsList();
+ String[] result = new String[list.size()];
+ int i = 0;
+ for (String c : list) {
+ result[i++] = c.toString();
+ }
+ return result;
+ }
+
+ public JobStatus getJobStatus(JobID oldJobID) throws IOException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+ TypeConverter.toYarn(oldJobID);
+ GetJobReportRequest request =
+ recordFactory.newRecordInstance(GetJobReportRequest.class);
+ request.setJobId(jobId);
+ JobReport report = ((GetJobReportResponse) invoke("getJobReport",
+ GetJobReportRequest.class, request)).getJobReport();
+ JobStatus jobStatus = null;
+ if (report != null) {
+ if (StringUtils.isEmpty(report.getJobFile())) {
+ String jobFile = MRApps.getJobFile(conf, report.getUser(), oldJobID);
+ report.setJobFile(jobFile);
+ }
+ String historyTrackingUrl = report.getTrackingUrl();
+ String url = StringUtils.isNotEmpty(historyTrackingUrl)
+ ? historyTrackingUrl : trackingUrl;
+ if (!UNAVAILABLE.equals(url)) {
+ url = HttpConfig.getSchemePrefix() + url;
+ }
+ jobStatus = TypeConverter.fromYarn(report, url);
+ }
+ return jobStatus;
+ }
+
+ public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID oldJobID, TaskType taskType)
+ throws IOException{
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+ TypeConverter.toYarn(oldJobID);
+ GetTaskReportsRequest request =
+ recordFactory.newRecordInstance(GetTaskReportsRequest.class);
+ request.setJobId(jobId);
+ request.setTaskType(TypeConverter.toYarn(taskType));
+
+ List<org.apache.hadoop.mapreduce.v2.api.records.TaskReport> taskReports =
+ ((GetTaskReportsResponse) invoke("getTaskReports", GetTaskReportsRequest.class,
+ request)).getTaskReportList();
+
+ return TypeConverter.fromYarn
+ (taskReports).toArray(new org.apache.hadoop.mapreduce.TaskReport[0]);
+ }
+
+ public boolean killTask(TaskAttemptID taskAttemptID, boolean fail)
+ throws IOException {
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID
+ = TypeConverter.toYarn(taskAttemptID);
+ if (fail) {
+ FailTaskAttemptRequest failRequest = recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
+ failRequest.setTaskAttemptId(attemptID);
+ invoke("failTaskAttempt", FailTaskAttemptRequest.class, failRequest);
+ } else {
+ KillTaskAttemptRequest killRequest = recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
+ killRequest.setTaskAttemptId(attemptID);
+ invoke("killTaskAttempt", KillTaskAttemptRequest.class, killRequest);
+ }
+ return true;
+ }
+
+ public boolean killJob(JobID oldJobID)
+ throws IOException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId
+ = TypeConverter.toYarn(oldJobID);
+ KillJobRequest killRequest = recordFactory.newRecordInstance(KillJobRequest.class);
+ killRequest.setJobId(jobId);
+ invoke("killJob", KillJobRequest.class, killRequest);
+ return true;
+ }
+
+ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
+ throws YarnRemoteException, IOException {
+ org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
+ TypeConverter.toYarn(oldJobID);
+ GetJobReportRequest request =
+ recordFactory.newRecordInstance(GetJobReportRequest.class);
+ request.setJobId(jobId);
+
+ JobReport report =
+ ((GetJobReportResponse) invoke("getJobReport",
+ GetJobReportRequest.class, request)).getJobReport();
+ if (EnumSet.of(JobState.SUCCEEDED, JobState.FAILED, JobState.KILLED,
+ JobState.ERROR).contains(report.getJobState())) {
+ if (oldTaskAttemptID != null) {
+ GetTaskAttemptReportRequest taRequest =
+ recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
+ taRequest.setTaskAttemptId(TypeConverter.toYarn(oldTaskAttemptID));
+ TaskAttemptReport taReport =
+ ((GetTaskAttemptReportResponse) invoke("getTaskAttemptReport",
+ GetTaskAttemptReportRequest.class, taRequest))
+ .getTaskAttemptReport();
+ if (taReport.getContainerId() == null
+ || taReport.getNodeManagerHost() == null) {
+ throw new IOException("Unable to get log information for task: "
+ + oldTaskAttemptID);
+ }
+ return new LogParams(
+ taReport.getContainerId().toString(),
+ taReport.getContainerId().getApplicationAttemptId()
+ .getApplicationId().toString(),
+ BuilderUtils.newNodeId(taReport.getNodeManagerHost(),
+ taReport.getNodeManagerPort()).toString(), report.getUser());
+ } else {
+ if (report.getAMInfos() == null || report.getAMInfos().size() == 0) {
+ throw new IOException("Unable to get log information for job: "
+ + oldJobID);
+ }
+ AMInfo amInfo = report.getAMInfos().get(report.getAMInfos().size() - 1);
+ return new LogParams(
+ amInfo.getContainerId().toString(),
+ amInfo.getAppAttemptId().getApplicationId().toString(),
+ BuilderUtils.newNodeId(amInfo.getNodeManagerHost(),
+ amInfo.getNodeManagerPort()).toString(), report.getUser());
+ }
+ } else {
+ throw new IOException("Cannot get log path for a in-progress job");
+ }
+ }
+}
Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/NotRunningJob.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,241 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+public class NotRunningJob implements MRClientProtocol {
+
+ private RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ private final JobState jobState;
+ private final ApplicationReport applicationReport;
+
+
+ private ApplicationReport getUnknownApplicationReport() {
+ ApplicationId unknownAppId = recordFactory
+ .newRecordInstance(ApplicationId.class);
+ ApplicationAttemptId unknownAttemptId = recordFactory
+ .newRecordInstance(ApplicationAttemptId.class);
+
+ // Setting AppState to NEW and finalStatus to UNDEFINED as they are never
+ // used for a non running job
+ return BuilderUtils.newApplicationReport(unknownAppId, unknownAttemptId,
+ "N/A", "N/A", "N/A", "N/A", 0, null, YarnApplicationState.NEW, "N/A",
+ "N/A", 0, 0, FinalApplicationStatus.UNDEFINED, null, "N/A");
+ }
+
+ NotRunningJob(ApplicationReport applicationReport, JobState jobState) {
+ this.applicationReport =
+ (applicationReport == null) ?
+ getUnknownApplicationReport() : applicationReport;
+ this.jobState = jobState;
+ }
+
+ @Override
+ public FailTaskAttemptResponse failTaskAttempt(
+ FailTaskAttemptRequest request) throws YarnRemoteException {
+ FailTaskAttemptResponse resp =
+ recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
+ return resp;
+ }
+
+ @Override
+ public GetCountersResponse getCounters(GetCountersRequest request)
+ throws YarnRemoteException {
+ GetCountersResponse resp =
+ recordFactory.newRecordInstance(GetCountersResponse.class);
+ Counters counters = recordFactory.newRecordInstance(Counters.class);
+ counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+ resp.setCounters(counters);
+ return resp;
+ }
+
+ @Override
+ public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
+ throws YarnRemoteException {
+ GetDiagnosticsResponse resp =
+ recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+ resp.addDiagnostics("");
+ return resp;
+ }
+
+ @Override
+ public GetJobReportResponse getJobReport(GetJobReportRequest request)
+ throws YarnRemoteException {
+ JobReport jobReport =
+ recordFactory.newRecordInstance(JobReport.class);
+ jobReport.setJobId(request.getJobId());
+ jobReport.setJobState(jobState);
+ jobReport.setUser(applicationReport.getUser());
+ jobReport.setStartTime(applicationReport.getStartTime());
+ jobReport.setDiagnostics(applicationReport.getDiagnostics());
+ jobReport.setJobName(applicationReport.getName());
+ jobReport.setTrackingUrl(applicationReport.getTrackingUrl());
+ jobReport.setFinishTime(applicationReport.getFinishTime());
+
+ GetJobReportResponse resp =
+ recordFactory.newRecordInstance(GetJobReportResponse.class);
+ resp.setJobReport(jobReport);
+ return resp;
+ }
+
+ @Override
+ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
+ GetTaskAttemptCompletionEventsRequest request)
+ throws YarnRemoteException {
+ GetTaskAttemptCompletionEventsResponse resp =
+ recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+ resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
+ return resp;
+ }
+
+ @Override
+ public GetTaskAttemptReportResponse getTaskAttemptReport(
+ GetTaskAttemptReportRequest request) throws YarnRemoteException {
+ //not invoked by anybody
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
+ throws YarnRemoteException {
+ GetTaskReportResponse resp =
+ recordFactory.newRecordInstance(GetTaskReportResponse.class);
+ TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
+ report.setTaskId(request.getTaskId());
+ report.setTaskState(TaskState.NEW);
+ Counters counters = recordFactory.newRecordInstance(Counters.class);
+ counters.addAllCounterGroups(new HashMap<String, CounterGroup>());
+ report.setCounters(counters);
+ report.addAllRunningAttempts(new ArrayList<TaskAttemptId>());
+ return resp;
+ }
+
+ @Override
+ public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
+ throws YarnRemoteException {
+ GetTaskReportsResponse resp =
+ recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+ resp.addAllTaskReports(new ArrayList<TaskReport>());
+ return resp;
+ }
+
+ @Override
+ public KillJobResponse killJob(KillJobRequest request)
+ throws YarnRemoteException {
+ KillJobResponse resp =
+ recordFactory.newRecordInstance(KillJobResponse.class);
+ return resp;
+ }
+
+ @Override
+ public KillTaskResponse killTask(KillTaskRequest request)
+ throws YarnRemoteException {
+ KillTaskResponse resp =
+ recordFactory.newRecordInstance(KillTaskResponse.class);
+ return resp;
+ }
+
+ @Override
+ public KillTaskAttemptResponse killTaskAttempt(
+ KillTaskAttemptRequest request) throws YarnRemoteException {
+ KillTaskAttemptResponse resp =
+ recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
+ return resp;
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnRemoteException {
+ /* Should not be invoked by anyone. */
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnRemoteException {
+ /* Should not be invoked by anyone. */
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnRemoteException {
+ /* Should not be invoked by anyone. */
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public InetSocketAddress getConnectAddress() {
+ /* Should not be invoked by anyone. Normally used to set token service */
+ throw new NotImplementedException();
+ }
+}
Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,168 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+
+public class ResourceMgrDelegate extends YarnClientImpl {
+ private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+
+ private YarnConfiguration conf;
+ private GetNewApplicationResponse application;
+ private ApplicationId applicationId;
+
+ /**
+ * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
+ * @param conf the configuration object.
+ */
+ public ResourceMgrDelegate(YarnConfiguration conf) {
+ super();
+ this.conf = conf;
+ init(conf);
+ start();
+ }
+
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ return TypeConverter.fromYarnNodes(super.getNodeReports());
+ }
+
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
+ }
+
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ // TODO: Implement getBlacklistedTrackers
+ LOG.warn("getBlacklistedTrackers - Not implemented yet");
+ return new TaskTrackerInfo[0];
+ }
+
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ YarnClusterMetrics metrics = super.getYarnClusterMetrics();
+ ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1,
+ metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
+ metrics.getNumNodeManagers(), 0, 0);
+ return oldMetrics;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Token getDelegationToken(Text renewer) throws IOException,
+ InterruptedException {
+ return ProtoUtils.convertFromProtoFormat(
+ super.getRMDelegationToken(renewer), rmAddress);
+ }
+
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return FileSystem.get(conf).getUri().toString();
+ }
+
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ this.application = super.getNewApplication();
+ this.applicationId = this.application.getApplicationId();
+ return TypeConverter.fromYarn(applicationId);
+ }
+
+ public QueueInfo getQueue(String queueName) throws IOException,
+ InterruptedException {
+ return TypeConverter.fromYarn(
+ super.getQueueInfo(queueName), this.conf);
+ }
+
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ return TypeConverter.fromYarnQueueUserAclsInfo(super
+ .getQueueAclsInfo());
+ }
+
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
+ }
+
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf);
+ }
+
+ public QueueInfo[] getChildQueues(String parent) throws IOException,
+ InterruptedException {
+ return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+ this.conf);
+ }
+
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+// Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+ String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ Path path = MRApps.getStagingAreaDir(conf, user);
+ LOG.debug("getStagingAreaDir: dir=" + path);
+ return path.toString();
+ }
+
+
+ public String getSystemDir() throws IOException, InterruptedException {
+ Path sysDir = new Path(MRJobConfig.JOB_SUBMIT_DIR);
+ //FileContext.getFileContext(conf).delete(sysDir, true);
+ return sysDir.toString();
+ }
+
+
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return 0;
+ }
+
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ return;
+ }
+
+
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return 0;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+}
Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,622 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Master;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class enables the current JobClient (0.22 hadoop) to run on YARN.
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class YARNRunner implements ClientProtocol {
+
+ private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private ResourceMgrDelegate resMgrDelegate;
+ private ClientCache clientCache;
+ private Configuration conf;
+ private final FileContext defaultFileContext;
+
+ /* usually is false unless the jobclient get delegation token is
+ * called. This is a hack wherein we do return a token from RM
+ * on getDelegationtoken but due to the restricted api on jobclient
+ * we just add a job history DT token when submitting a job.
+ */
+ private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED =
+ false;
+
+ /**
+ * Yarn runner incapsulates the client interface of
+ * yarn
+ * @param conf the configuration object for the client
+ */
+ public YARNRunner(Configuration conf) {
+ this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)));
+ }
+
+ /**
+ * Similar to {@link #YARNRunner(Configuration)} but allowing injecting
+ * {@link ResourceMgrDelegate}. Enables mocking and testing.
+ * @param conf the configuration object for the client
+ * @param resMgrDelegate the resourcemanager client handle.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) {
+ this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate));
+ }
+
+ /**
+ * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)}
+ * but allowing injecting {@link ClientCache}. Enable mocking and testing.
+ * @param conf the configuration object
+ * @param resMgrDelegate the resource manager delegate
+ * @param clientCache the client cache object.
+ */
+ public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
+ ClientCache clientCache) {
+ this.conf = conf;
+ try {
+ this.resMgrDelegate = resMgrDelegate;
+ this.clientCache = clientCache;
+ this.defaultFileContext = FileContext.getFileContext(this.conf);
+ } catch (UnsupportedFileSystemException ufe) {
+ throw new RuntimeException("Error in instantiating YarnClient", ufe);
+ }
+ }
+
+ @Private
+ /**
+ * Used for testing mostly.
+ * @param resMgrDelegate the resource manager delegate to set to.
+ */
+ public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) {
+ this.resMgrDelegate = resMgrDelegate;
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Use Token.renew instead");
+ }
+
+ @Override
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getActiveTrackers();
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return resMgrDelegate.getAllJobs();
+ }
+
+ @Override
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getBlacklistedTrackers();
+ }
+
+ @Override
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getClusterMetrics();
+ }
+
+ @VisibleForTesting
+ Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy)
+ throws IOException, InterruptedException {
+ GetDelegationTokenRequest request = recordFactory
+ .newRecordInstance(GetDelegationTokenRequest.class);
+ request.setRenewer(Master.getMasterPrincipal(conf));
+ DelegationToken mrDelegationToken = hsProxy.getDelegationToken(request)
+ .getDelegationToken();
+ return ProtoUtils.convertFromProtoFormat(mrDelegationToken,
+ hsProxy.getConnectAddress());
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+ throws IOException, InterruptedException {
+ // The token is only used for serialization. So the type information
+ // mismatch should be fine.
+ return resMgrDelegate.getDelegationToken(renewer);
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return resMgrDelegate.getFilesystemName();
+ }
+
+ @Override
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ return resMgrDelegate.getNewJobID();
+ }
+
+ @Override
+ public QueueInfo getQueue(String queueName) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueue(queueName);
+ }
+
+ @Override
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueueAclsForCurrentUser();
+ }
+
+ @Override
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getQueues();
+ }
+
+ @Override
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getRootQueues();
+ }
+
+ @Override
+ public QueueInfo[] getChildQueues(String parent) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getChildQueues(parent);
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getStagingAreaDir();
+ }
+
+ @Override
+ public String getSystemDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getSystemDir();
+ }
+
+ @Override
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getTaskTrackerExpiryInterval();
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+ throws IOException, InterruptedException {
+
+ /* check if we have a hsproxy, if not, no need */
+ MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
+ if (hsProxy != null) {
+ // JobClient will set this flag if getDelegationToken is called, if so, get
+ // the delegation tokens for the HistoryServer also.
+
+ // TODO Fix this. Temporarily dumping this into TezConfig
+ if (conf.getBoolean(MRJobConfig.HS_DELEGATION_TOKEN_REQUIRED,
+ DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
+ Token hsDT = getDelegationTokenFromHS(hsProxy);
+ ts.addToken(hsDT.getService(), hsDT);
+ }
+ }
+
+ // Upload only in security mode: TODO
+ Path applicationTokensFile =
+ new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
+ try {
+ ts.writeTokenStorageFile(applicationTokensFile, conf);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ // Construct necessary information to start the MR AM
+ ApplicationSubmissionContext appContext =
+ createApplicationSubmissionContext(conf, jobSubmitDir, ts);
+
+ // Submit to ResourceManager
+ ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
+
+ ApplicationReport appMaster = resMgrDelegate
+ .getApplicationReport(applicationId);
+ String diagnostics =
+ (appMaster == null ?
+ "application report is null" : appMaster.getDiagnostics());
+ if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
+ || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
+ throw new IOException("Failed to run job : " +
+ diagnostics);
+ }
+ return clientCache.getClient(jobId).getJobStatus(jobId);
+ }
+
+ private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
+ throws IOException {
+ LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+ .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(type);
+ rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ return rsrc;
+ }
+
+ public ApplicationSubmissionContext createApplicationSubmissionContext(
+ Configuration jobConf,
+ String jobSubmitDir, Credentials ts) throws IOException {
+ ApplicationId applicationId = resMgrDelegate.getApplicationId();
+
+ // Setup resource requirements
+ Resource capability = recordFactory.newRecordInstance(Resource.class);
+ capability.setMemory(
+ conf.getInt(
+ MRJobConfig.MR_AM_VMEM_MB, MRJobConfig.DEFAULT_MR_AM_VMEM_MB
+ )
+ );
+ capability.setVirtualCores(
+ conf.getInt(
+ MRJobConfig.MR_AM_CPU_VCORES, MRJobConfig.DEFAULT_MR_AM_CPU_VCORES
+ )
+ );
+ LOG.debug("AppMaster capability = " + capability);
+
+ // Setup LocalResources
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+
+ Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE);
+
+ URL yarnUrlForJobSubmitDir = ConverterUtils
+ .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
+ .resolvePath(
+ defaultFileContext.makeQualified(new Path(jobSubmitDir))));
+ LOG.debug("Creating setup context, jobSubmitDir url is "
+ + yarnUrlForJobSubmitDir);
+
+ localResources.put(MRJobConfig.JOB_CONF_FILE,
+ createApplicationResource(defaultFileContext,
+ jobConfPath, LocalResourceType.FILE));
+ if (jobConf.get(MRJobConfig.JAR) != null) {
+ Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR));
+ LocalResource rc = createApplicationResource(defaultFileContext,
+ jobJarPath,
+ LocalResourceType.PATTERN);
+ String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN,
+ JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern();
+ rc.setPattern(pattern);
+ localResources.put(MRJobConfig.JOB_JAR, rc);
+ } else {
+ // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
+ // mapreduce jar itself which is already on the classpath.
+ LOG.info("Job jar is not present. "
+ + "Not adding any jar to the list of resources.");
+ }
+
+ // TODO gross hack
+ for (String s : new String[] {
+ MRJobConfig.JOB_SPLIT,
+ MRJobConfig.JOB_SPLIT_METAINFO,
+ MRJobConfig.APPLICATION_TOKENS_FILE }) {
+ localResources.put(
+ MRJobConfig.JOB_SUBMIT_DIR + "/" + s,
+ createApplicationResource(defaultFileContext,
+ new Path(jobSubmitDir, s), LocalResourceType.FILE));
+ }
+
+ // Setup security tokens
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+ // Setup the command to run the AM
+ List<String> vargs = new ArrayList<String>(8);
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+
+ // TODO: why do we use 'conf' some places and 'jobConf' others?
+ long logSize = TaskLog.getTaskLogLength(new JobConf(conf));
+ String logLevel = jobConf.get(
+ MRJobConfig.MR_AM_LOG_LEVEL, MRJobConfig.DEFAULT_MR_AM_LOG_LEVEL);
+ MRApps.addLog4jSystemProperties(logLevel, logSize, vargs);
+
+ // Check for Java Lib Path usage in MAP and REDUCE configs
+ warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS,""), "map",
+ MRJobConfig.MAP_JAVA_OPTS, MRJobConfig.MAP_ENV);
+ warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), "map",
+ MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
+ warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), "reduce",
+ MRJobConfig.REDUCE_JAVA_OPTS, MRJobConfig.REDUCE_ENV);
+ warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), "reduce",
+ MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, MRJobConfig.MAPRED_ADMIN_USER_ENV);
+
+ // Add AM admin command opts before user command opts
+ // so that it can be overridden by user
+ String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+ warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
+ MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
+ vargs.add(mrAppMasterAdminOptions);
+
+ // Add AM user command opts
+ String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+ warnForJavaLibPath(mrAppMasterUserOptions, "app master",
+ MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
+ vargs.add(mrAppMasterUserOptions);
+
+ vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+ Path.SEPARATOR + ApplicationConstants.STDERR);
+
+
+ Vector<String> vargsFinal = new Vector<String>(8);
+ // Final command
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+ vargsFinal.add(mergedCommand.toString());
+
+ LOG.debug("Command to launch container for ApplicationMaster is : "
+ + mergedCommand);
+
+ // Setup the CLASSPATH in environment
+ // i.e. add { Hadoop jars, job jar, CWD } to classpath.
+ Map<String, String> environment = new HashMap<String, String>();
+ MRApps.setClasspath(environment, conf);
+
+ // Setup the environment variables for Admin first
+ MRApps.setEnvFromInputString(environment,
+ conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV));
+ // Setup the environment variables (LD_LIBRARY_PATH, etc)
+ MRApps.setEnvFromInputString(environment,
+ conf.get(MRJobConfig.MR_AM_ENV));
+
+ // Parse distributed cache
+ MRApps.setupDistributedCache(jobConf, localResources);
+
+ Map<ApplicationAccessType, String> acls
+ = new HashMap<ApplicationAccessType, String>(2);
+ acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
+ MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+ acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
+ MRJobConfig.JOB_ACL_MODIFY_JOB,
+ MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+
+ // Setup ContainerLaunchContext for AM container
+ ContainerLaunchContext amContainer = BuilderUtils
+ .newContainerLaunchContext(null, UserGroupInformation
+ .getCurrentUser().getShortUserName(), capability, localResources,
+ environment, vargsFinal, null, securityTokens, acls);
+
+ // Set up the ApplicationSubmissionContext
+ ApplicationSubmissionContext appContext =
+ recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(applicationId); // ApplicationId
+ appContext.setUser( // User name
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ appContext.setQueue( // Queue name
+ jobConf.get(JobContext.QUEUE_NAME,
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
+ appContext.setApplicationName( // Job name
+ jobConf.get(JobContext.JOB_NAME,
+ YarnConfiguration.DEFAULT_APPLICATION_NAME));
+ appContext.setCancelTokensWhenComplete(
+ conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
+ appContext.setAMContainerSpec(amContainer); // AM Container
+
+ return appContext;
+ }
+
+ @Override
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ resMgrDelegate.setJobPriority(arg0, arg1);
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return resMgrDelegate.getProtocolVersion(arg0, arg1);
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ throw new UnsupportedOperationException("Use Token.renew instead");
+ }
+
+
+ @Override
+ public Counters getJobCounters(JobID arg0) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0).getJobCounters(arg0);
+ }
+
+ @Override
+ public String getJobHistoryDir() throws IOException, InterruptedException {
+ return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobID jobID) throws IOException,
+ InterruptedException {
+ JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID);
+ return status;
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+ int arg2) throws IOException, InterruptedException {
+ return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2);
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0);
+ }
+
+ @Override
+ public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+ throws IOException, InterruptedException {
+ return clientCache.getClient(jobID)
+ .getTaskReports(jobID, taskType);
+ }
+
+ @Override
+ public void killJob(JobID arg0) throws IOException, InterruptedException {
+ /* check if the status is not running, if not send kill to RM */
+ JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
+ if (status.getState() != JobStatus.State.RUNNING) {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ return;
+ }
+
+ try {
+ /* send a kill to the AM */
+ clientCache.getClient(arg0).killJob(arg0);
+ long currentTimeMillis = System.currentTimeMillis();
+ long timeKillIssued = currentTimeMillis;
+ while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
+ != JobStatus.State.KILLED)) {
+ try {
+ Thread.sleep(1000L);
+ } catch(InterruptedException ie) {
+ /** interrupted, just break */
+ break;
+ }
+ currentTimeMillis = System.currentTimeMillis();
+ status = clientCache.getClient(arg0).getJobStatus(arg0);
+ }
+ } catch(IOException io) {
+ LOG.debug("Error when checking for application status", io);
+ }
+ if (status.getState() != JobStatus.State.KILLED) {
+ resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
+ }
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
+ InterruptedException {
+ return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1);
+ }
+
+ @Override
+ public AccessControlList getQueueAdmins(String arg0) throws IOException {
+ return new AccessControlList("*");
+ }
+
+ @Override
+ public JobTrackerStatus getJobTrackerStatus() throws IOException,
+ InterruptedException {
+ return JobTrackerStatus.RUNNING;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion,
+ clientMethodsHash);
+ }
+
+ @Override
+ public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+ throws IOException {
+ return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
+ }
+
+ private static void warnForJavaLibPath(String opts, String component,
+ String javaConf, String envConf) {
+ if (opts != null && opts.contains("-Djava.library.path")) {
+ LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " +
+ "programs to no longer function if hadoop native libraries " +
+ "are used. These values should be set as part of the " +
+ "LD_LIBRARY_PATH in the " + component + " JVM env using " +
+ envConf + " config settings.");
+ }
+ }
+}
Added: incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java (added)
+++ incubator/tez/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+
+public class YarnTezClientProtocolProvider extends ClientProtocolProvider {
+
+ @Override
+ public ClientProtocol create(Configuration conf) throws IOException {
+ if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
+ return new YARNRunner(conf);
+ }
+ return null;
+ }
+
+ @Override
+ public ClientProtocol create(InetSocketAddress addr, Configuration conf)
+ throws IOException {
+ return create(conf);
+ }
+
+ @Override
+ public void close(ClientProtocol clientProtocol) throws IOException {
+ // nothing to do
+ }
+
+}
Added: incubator/tez/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider (added)
+++ incubator/tez/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider Fri Mar 15 21:26:36 2013
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.tez.mapreduce.YarnTezClientProtocolProvider