You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC

svn commit: r1082677 [15/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,303 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.List;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+
+public class ClientServiceDelegate {
+  private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
+  private Configuration conf;
+  private ApplicationID appId;
+  private final ResourceMgrDelegate rm;
+  private MRClientProtocol realProxy = null;
+  private String serviceAddr = "";
+  private String serviceHttpAddr = "";
+  
+  ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm, 
+      ApplicationID appId) throws AvroRemoteException {
+    this.conf = conf;
+    this.rm = rm;
+    this.appId = appId;
+    if (appId != null) {
+      refreshProxy();
+    }
+  }
+
+  private void refreshProxy() throws AvroRemoteException {
+    ApplicationMaster appMaster = rm.getApplicationMaster(appId);
+    if (ApplicationState.COMPLETED.equals(appMaster.state)) {
+      serviceAddr = conf.get("jobhistory.server.hostname") + ":"
+      + conf.get("jobhistory.server.port");
+      LOG.debug("Reconnecting to job history server " + serviceAddr);
+    } else {
+      /* TODO check to confirm its really launched */
+      serviceAddr = appMaster.host + ":" + appMaster.rpcPort;
+      serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort;
+    }
+    try {
+      instantiateProxy(serviceAddr);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  void instantiateProxy(ApplicationID applicationId, ApplicationMaster appMaster)
+      throws IOException {
+    try {
+      this.appId = applicationId;
+      LOG.info("Trying to connect to the ApplicationManager of"
+          + " application " + applicationId + " running at " + appMaster);
+      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+      serviceAddr = appMaster.host + ":"
+          + appMaster.rpcPort;
+      serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort;
+      if (UserGroupInformation.isSecurityEnabled()) {
+        String clientTokenEncoded = appMaster.clientToken.toString();
+        Token<ApplicationTokenIdentifier> clientToken = new Token<ApplicationTokenIdentifier>();
+        clientToken.decodeFromUrlString(clientTokenEncoded);
+        clientToken.setService(new Text(appMaster.host.toString() + ":"
+            + appMaster.rpcPort));
+        currentUser.addToken(clientToken);
+      }
+      instantiateProxy(serviceAddr);
+      LOG.info("Connection to the ApplicationManager established.");
+    } catch (IOException e) {
+      throw (new IOException(e));
+    }
+  }
+  
+  private void instantiateProxy(final String serviceAddr) throws IOException {
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    realProxy = currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+      @Override
+      public MRClientProtocol run() {
+        Configuration myConf = new Configuration(conf);
+        myConf.setClass(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
+        SchedulerSecurityInfo.class, SecurityInfo.class); 
+        YarnRPC rpc = YarnRPC.create(myConf);
+        return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+        NetUtils.createSocketAddr(serviceAddr), myConf);
+      }
+    });
+  }
+
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+      InterruptedException {
+    appId = TypeConverter.toYarn(arg0).appID;
+    org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
+    if (realProxy == null) refreshProxy();
+    try {
+      return TypeConverter.fromYarn(realProxy.getCounters(jobID));
+    } catch(Exception e) {
+      LOG.debug("Failing to contact application master", e);
+      refreshProxy();
+      return TypeConverter.fromYarn(realProxy.getCounters(jobID));
+    }
+  }
+
+  public String getJobHistoryDir() throws IOException, InterruptedException {
+    //TODO fix this
+    return "";
+  }
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+      int arg2) throws IOException, InterruptedException {
+    appId = TypeConverter.toYarn(arg0).appID;
+    if (realProxy == null) refreshProxy();
+    
+    org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
+    List<org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent> list = null;
+    try {
+      list = realProxy.getTaskAttemptCompletionEvents(jobID,
+          arg1, arg2);
+    } catch(Exception e) {
+      LOG.debug("Failed to contact application master ", e);
+      refreshProxy();
+      list = realProxy.getTaskAttemptCompletionEvents(jobID,
+          arg1, arg2);
+    }
+    return TypeConverter.fromYarn(
+        list.toArray(new org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[0]));
+  }
+
+  public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
+      arg0)
+  throws IOException,
+      InterruptedException {
+    
+    List<CharSequence> list = null;
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID = TypeConverter.toYarn(arg0);
+    appId = TypeConverter.toYarn(arg0.getJobID()).appID;
+    if (realProxy == null) refreshProxy();
+  
+    try {
+      list = realProxy.getDiagnostics(attemptID);
+    } catch(Exception e) {
+      LOG.debug("Failed to contact application master ", e);
+      refreshProxy();
+      list = realProxy.getDiagnostics(attemptID);
+    }
+    String[] result = new String[list.size()];
+    int i = 0;
+    for (CharSequence c : list) {
+      result[i++] = c.toString();
+    }
+    return result;
+  }
+
+  //this method is here due to package restriction of 
+  //TaskReport constructor
+  public static org.apache.hadoop.mapred.TaskReport[] fromYarn(
+      List<TaskReport> reports) {
+    org.apache.hadoop.mapred.TaskReport[] result = 
+      new org.apache.hadoop.mapred.TaskReport[reports.size()];
+    int i = 0;
+    for (TaskReport report : reports) {
+      List<CharSequence> diag = report.diagnostics;
+      String[] diagnosticArr = new String[diag.size()];
+      int j = 0;
+      for (CharSequence c : diag) {
+        diagnosticArr[j++] = c.toString();
+      }
+      org.apache.hadoop.mapred.TaskReport oldReport = 
+        new org.apache.hadoop.mapred.TaskReport(
+            TypeConverter.fromYarn(report.id), report.progress, 
+            report.state.toString(),
+            diagnosticArr, TypeConverter.fromYarn(report.state), 
+          report.startTime, report.finishTime,
+          new org.apache.hadoop.mapred.Counters(
+              TypeConverter.fromYarn(report.counters)));
+      result[i++] = oldReport;
+    }
+    return result;
+  }
+
+ 
+  public JobReport getJobReport(org.apache.hadoop.mapreduce.v2.api.JobID jobID)
+      throws AvroRemoteException, YarnRemoteException {
+    appId = jobID.appID;
+    if (realProxy == null) refreshProxy();
+    
+    try {
+      return realProxy.getJobReport(jobID);
+    } catch (Exception e) {
+      refreshProxy();
+      return realProxy.getJobReport(jobID);
+    }
+  }
+
+  public JobStatus getJobStatus(org.apache.hadoop.mapreduce.v2.api.JobID jobId) 
+      throws AvroRemoteException, YarnRemoteException {
+    appId = jobId.appID;
+    if (realProxy == null) refreshProxy();
+    String trackingUrl = serviceAddr;
+    String stagingDir = conf.get("yarn.apps.stagingDir");
+    String jobFile = stagingDir + "/" + jobId.toString();
+    return TypeConverter.fromYarn(getJobReport(jobId), jobFile, serviceHttpAddr);
+  }
+  
+
+  public JobStatus getJobStatus(JobID jobID) throws YarnRemoteException,
+      AvroRemoteException {
+    return getJobStatus(TypeConverter.toYarn(jobID));
+  }
+
+  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+      throws YarnRemoteException, AvroRemoteException {
+      List<TaskReport> taskReports = null;
+      org.apache.hadoop.mapreduce.v2.api.JobID nJobID = TypeConverter.toYarn(jobID);
+      appId = nJobID.appID;
+      if (realProxy == null) refreshProxy();
+    
+      try {
+        taskReports = realProxy.getTaskReports(nJobID, 
+            TypeConverter.toYarn(taskType));
+      } catch(Exception e) {
+        LOG.debug("Failed to contact application master ", e);
+        refreshProxy();
+        taskReports = realProxy.getTaskReports(nJobID, 
+            TypeConverter.toYarn(taskType));
+      }
+      return (org.apache.hadoop.mapreduce.TaskReport[])TypeConverter.fromYarn
+        (taskReports).toArray();
+  }
+
+  public Void killJob(JobID jobID) throws YarnRemoteException,
+      AvroRemoteException {
+    org.apache.hadoop.mapreduce.v2.api.JobID  nJobID = TypeConverter.toYarn(jobID);
+    appId = nJobID.appID;
+    if (realProxy == null) refreshProxy();
+    
+    try {
+      realProxy.killJob(nJobID);
+    } catch(Exception e) {
+      LOG.debug("Failed to contact application master ", e);
+      refreshProxy();
+      realProxy.killJob(nJobID);
+    }
+    return null;
+  }
+
+  public boolean killTask(TaskAttemptID taskAttemptID, boolean killed)
+      throws YarnRemoteException, AvroRemoteException {
+    org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID 
+      = TypeConverter.toYarn(taskAttemptID);
+    appId = attemptID.taskID.jobID.appID;
+    if (realProxy == null) refreshProxy();
+    
+    try {
+      realProxy.killTaskAttempt(attemptID);
+    } catch(Exception e) {
+      LOG.debug("Failed to contact application master ", e);
+      refreshProxy();
+      realProxy.killTaskAttempt(attemptID);
+    }
+    return true;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,226 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.ClientRMProtocol;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+
+// TODO: This should be part of something like yarn-client.
+public class ResourceMgrDelegate {
+  private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+      
+  private Configuration conf;
+  ClientRMProtocol applicationsManager;
+  private ApplicationID applicationId;
+
+  public ResourceMgrDelegate(Configuration conf) throws UnsupportedFileSystemException {
+    this.conf = conf;
+    YarnRPC rpc = YarnRPC.create(conf);
+    InetSocketAddress rmAddress =
+        NetUtils.createSocketAddr(conf.get(
+            YarnConfiguration.APPSMANAGER_ADDRESS,
+            YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS));
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    Configuration appsManagerServerConf = new Configuration(this.conf);
+    appsManagerServerConf.setClass(
+        CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        ClientRMSecurityInfo.class, SecurityInfo.class);
+    applicationsManager =
+        (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
+            rmAddress, appsManagerServerConf);
+    LOG.info("Connected to ResourceManager at " + rmAddress);
+  }
+  
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    return;
+  }
+
+
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+    return null;
+  }
+
+
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+    return null;
+  }
+
+
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    throw new IOException("Not implemented");
+  }
+
+
+  public QueueInfo[] getChildQueues(String arg0) throws IOException,
+      InterruptedException {
+    throw new IOException("Not implemented");
+  }
+
+
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {
+    YarnClusterMetrics metrics = applicationsManager.getClusterMetrics();
+    ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, 
+        metrics.numNodeManagers * 10, metrics.numNodeManagers * 2, 1,
+        metrics.numNodeManagers, 0, 0);
+    return oldMetrics;
+  }
+
+
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text arg0)
+      throws IOException, InterruptedException {
+    throw new IOException("Not Implemented");
+  }
+
+
+  public String getFilesystemName() throws IOException, InterruptedException {
+    return FileSystem.get(conf).getUri().toString();
+  }
+
+  public JobID getNewJobID() throws IOException, InterruptedException {
+    applicationId = applicationsManager.getNewApplicationId();
+    return TypeConverter.fromYarn(applicationId);
+  }
+
+  
+  public QueueInfo getQueue(String arg0) throws IOException,
+      InterruptedException {
+    throw new IOException("Not implemented");
+  }
+
+
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    throw new IOException("Not implemented");
+  }
+
+
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    throw new IOException("Not implemented");
+  }
+
+
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    throw new IOException("Not Implemented");
+  }
+
+
+  public String getStagingAreaDir() throws IOException, InterruptedException {
+//    Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+    Path path = new Path(conf.get(YARNApplicationConstants.APPS_STAGING_DIR_KEY));
+    LOG.info("DEBUG --- getStagingAreaDir: dir=" + path);
+    return path.toString();
+  }
+
+
+  public String getSystemDir() throws IOException, InterruptedException {
+    Path sysDir = new Path(
+        YARNApplicationConstants.JOB_SUBMIT_DIR);
+    FileContext.getFileContext(conf).delete(sysDir, true);
+    return sysDir.toString();
+  }
+  
+
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return 0;
+  }
+  
+  public void setJobPriority(JobID arg0, String arg1) throws IOException,
+      InterruptedException {
+    return;
+  }
+
+
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return 0;
+  }
+
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    throw new IOException("Not implemented");
+  }
+  
+  
+  public ApplicationID submitApplication(ApplicationSubmissionContext appContext) 
+  throws IOException {
+    appContext.applicationId = applicationId;
+    applicationsManager.submitApplication(appContext);
+    LOG.info("Submitted application " + applicationId + " to ResourceManager");
+    return applicationId;
+  }
+
+  public ApplicationMaster getApplicationMaster(ApplicationID appId) 
+    throws AvroRemoteException {
+    ApplicationMaster appMaster = 
+      applicationsManager.getApplicationMaster(appId);
+    while (appMaster.state != ApplicationState.RUNNING &&
+        appMaster.state != ApplicationState.KILLED && 
+        appMaster.state != ApplicationState.FAILED && 
+        appMaster.state != ApplicationState.COMPLETED) {
+      appMaster = applicationsManager.getApplicationMaster(appId);
+      try {
+        LOG.info("Waiting for appMaster to start..");
+        Thread.sleep(2000);
+      } catch(InterruptedException ie) {
+        //DO NOTHING
+      }
+    }
+    return appMaster;
+  }
+
+  public ApplicationID getApplicationId() {
+    return applicationId;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,533 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceType;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+
+/**
+ * This class enables the current JobClient (0.22 hadoop) to run on YARN.
+ */
+public class YARNRunner implements ClientProtocol {
+
+  private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+
+  public static final String YARN_AM_RESOURCE_KEY = "yarn.am.mapreduce.resource.mb";
+  private static final int DEFAULT_YARN_AM_RESOURCE = 1024;
+  
+  private ResourceMgrDelegate resMgrDelegate;
+  private ClientServiceDelegate clientServiceDelegate;
+  private YarnConfiguration conf;
+
+  /**
+   * Yarn runner incapsulates the client interface of
+   * yarn
+   * @param conf the configuration object for the client
+   */
+  public YARNRunner(Configuration conf, ApplicationID appID)
+      throws AvroRemoteException {
+    this.conf = new YarnConfiguration(conf);
+    try {
+      this.resMgrDelegate = new ResourceMgrDelegate(conf);
+      this.clientServiceDelegate = new ClientServiceDelegate(conf,
+          resMgrDelegate, appID);
+    } catch (UnsupportedFileSystemException ufe) {
+      throw new RuntimeException("Error in instantiating YarnClient", ufe);
+    }
+  }
+
+  /**
+   * Yarn runner incapsulates the client interface of
+   * yarn
+   * @param conf the configuration object for the client
+   */
+  public YARNRunner(Configuration conf) throws AvroRemoteException {
+    this(conf, null);
+  }
+  @Override
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    resMgrDelegate.cancelDelegationToken(arg0);
+  }
+
+  @Override
+  public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+      InterruptedException {
+	  throw new IOException("Not implemented");
+  }
+
+  @Override
+  public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+   return resMgrDelegate.getAllJobs();
+  }
+
+  @Override
+  public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getBlacklistedTrackers();
+  }
+
+  @Override
+  public QueueInfo[] getChildQueues(String arg0) throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getChildQueues(arg0);
+  }
+
+  @Override
+  public ClusterMetrics getClusterMetrics() throws IOException,
+      InterruptedException {  
+    return resMgrDelegate.getClusterMetrics();
+  }
+
+  @Override
+  public Token<DelegationTokenIdentifier> getDelegationToken(Text arg0)
+      throws IOException, InterruptedException {
+    return resMgrDelegate.getDelegationToken(arg0);
+  }
+
+  @Override
+  public String getFilesystemName() throws IOException, InterruptedException {
+    return resMgrDelegate.getFilesystemName();
+  }
+
+  @Override
+  public JobID getNewJobID() throws IOException, InterruptedException {
+    return resMgrDelegate.getNewJobID();
+  }
+
+  @Override
+  public QueueInfo getQueue(String arg0) throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getQueue(arg0);
+  }
+
+  @Override
+  public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getQueueAclsForCurrentUser();
+  }
+
+  @Override
+  public QueueInfo[] getQueues() throws IOException, InterruptedException {
+    return resMgrDelegate.getQueues();
+  }
+
+  @Override
+  public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+    return resMgrDelegate.getRootQueues();
+  }
+
+  @Override
+  public String getStagingAreaDir() throws IOException, InterruptedException {
+    return resMgrDelegate.getStagingAreaDir();
+  }
+
+  @Override
+  public String getSystemDir() throws IOException, InterruptedException {
+    return resMgrDelegate.getSystemDir();
+  }
+  
+  @Override
+  public long getTaskTrackerExpiryInterval() throws IOException,
+      InterruptedException {
+    return resMgrDelegate.getTaskTrackerExpiryInterval();
+  }
+  
+  @Override
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+  throws IOException, InterruptedException{
+
+    // Upload only in security mode: TODO
+    Path applicationTokensFile =
+        new Path(jobSubmitDir, YarnConfiguration.APPLICATION_TOKENS_FILE);
+    try {
+      ts.writeTokenStorageFile(applicationTokensFile, conf);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    // XXX Remove
+    Path submitJobDir = new Path(jobSubmitDir);
+    FileContext defaultFS = FileContext.getFileContext(conf);
+    Path submitJobFile =
+      defaultFS.makeQualified(JobSubmissionFiles.getJobConfPath(submitJobDir));
+    FSDataInputStream in = defaultFS.open(submitJobFile);
+    conf.addResource(in);
+    // ---
+
+    // Construct necessary information to start the MR AM
+    ApplicationSubmissionContext appContext = 
+      getApplicationSubmissionContext(conf, jobSubmitDir, ts);
+    setupDistributedCache(conf, appContext);
+    
+    // XXX Remove
+    in.close();
+    // ---
+    
+    // Submit to ResourceManager
+    ApplicationID applicationId = resMgrDelegate.submitApplication(appContext);
+    
+    ApplicationMaster appMaster = 
+      resMgrDelegate.getApplicationMaster(applicationId);
+    if (appMaster.state == ApplicationState.FAILED || appMaster.state ==
+      ApplicationState.KILLED) {
+      throw new AvroRemoteException("failed to run job");
+    }
+    clientServiceDelegate.instantiateProxy(applicationId, appMaster);
+    return clientServiceDelegate.getJobStatus(jobId);
+  }
+
+  private LocalResource createApplicationResource(FileContext fs, Path p)
+      throws IOException {
+    LocalResource rsrc = new LocalResource();
+    FileStatus rsrcStat = fs.getFileStatus(p);
+    rsrc.resource = AvroUtil.getYarnUrlFromPath(rsrcStat.getPath());
+    rsrc.size = rsrcStat.getLen();
+    rsrc.timestamp = rsrcStat.getModificationTime();
+    rsrc.type = LocalResourceType.FILE;
+    rsrc.state = LocalResourceVisibility.APPLICATION;
+    return rsrc;
+  }
+
+  private ApplicationSubmissionContext getApplicationSubmissionContext(
+      Configuration jobConf,
+      String jobSubmitDir, Credentials ts) throws IOException {
+    ApplicationSubmissionContext appContext =
+        new ApplicationSubmissionContext();
+    ApplicationID applicationId = resMgrDelegate.getApplicationId();
+    appContext.applicationId = applicationId;
+    Resource capability = new Resource();
+    capability.memory =
+        conf.getInt(YARN_AM_RESOURCE_KEY, DEFAULT_YARN_AM_RESOURCE);
+    LOG.info("Master capability = " + capability);
+    appContext.masterCapability = capability;
+
+    FileContext defaultFS = FileContext.getFileContext(conf);
+    Path jobConfPath = new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE);
+    
+    URL yarnUrlForJobSubmitDir =
+        AvroUtil.getYarnUrlFromPath(defaultFS.makeQualified(new Path(
+            jobSubmitDir)));
+    appContext.resources = new HashMap<CharSequence, URL>();
+    LOG.debug("Creating setup context, jobSubmitDir url is "
+        + yarnUrlForJobSubmitDir);
+
+    appContext.resources.put(YARNApplicationConstants.JOB_SUBMIT_DIR,
+        yarnUrlForJobSubmitDir);
+
+    appContext.resources_todo = new HashMap<CharSequence,LocalResource>();
+    appContext.resources_todo.put(YARNApplicationConstants.JOB_CONF_FILE,
+          createApplicationResource(defaultFS,
+          jobConfPath));
+    appContext.resources_todo.put(YARNApplicationConstants.JOB_JAR,
+          createApplicationResource(defaultFS,
+            new Path(jobSubmitDir, YARNApplicationConstants.JOB_JAR)));
+    // TODO gross hack
+    for (String s : new String[] { "job.split", "job.splitmetainfo",
+        YarnConfiguration.APPLICATION_TOKENS_FILE }) {
+      appContext.resources_todo.put(
+          YARNApplicationConstants.JOB_SUBMIT_DIR + "/" + s,
+          createApplicationResource(defaultFS,
+            new Path(jobSubmitDir, s)));
+    }
+
+    // TODO: Only if security is on.
+    List<CharSequence> fsTokens = new ArrayList<CharSequence>();
+    for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+      fsTokens.add(token.encodeToUrlString());
+    }
+    
+    // TODO - Remove this!
+    appContext.fsTokens = fsTokens;
+    DataOutputBuffer dob = new DataOutputBuffer();
+    ts.writeTokenStorageToStream(dob);
+    appContext.fsTokens_todo =
+      ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+    // Add queue information
+    appContext.queue = 
+      jobConf.get(JobContext.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME);
+    
+    // Add job name
+    appContext.applicationName = jobConf.get(JobContext.JOB_NAME, "N/A");
+    
+    // Add the command line
+    String javaHome = "$JAVA_HOME";
+    Vector<CharSequence> vargs = new Vector<CharSequence>(8);
+    vargs.add(javaHome + "/bin/java");
+    vargs.add(conf.get(YARNApplicationConstants.MR_APPMASTER_COMMAND_OPTS,
+        "-Dhadoop.root.logger=DEBUG,console -Xmx1024m"));
+
+    // Add { job jar, MR app jar } to classpath.
+    appContext.environment = new HashMap<CharSequence, CharSequence>();
+    MRApps.setInitialClasspath(appContext.environment);
+    MRApps.addToClassPath(appContext.environment,
+        YARNApplicationConstants.JOB_JAR);
+    MRApps.addToClassPath(appContext.environment,
+        YARNApplicationConstants.YARN_MAPREDUCE_APP_JAR_PATH);
+    vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
+    vargs.add(String.valueOf(applicationId.clusterTimeStamp));
+    vargs.add(String.valueOf(applicationId.id));
+    vargs.add("1>logs/stderr");
+    vargs.add("2>logs/stdout");
+
+    Vector<CharSequence> vargsFinal = new Vector<CharSequence>(8);
+    // Final commmand
+    StringBuilder mergedCommand = new StringBuilder();
+    for (CharSequence str : vargs) {
+      mergedCommand.append(str).append(" ");
+    }
+    vargsFinal.add("mkdir logs;" + mergedCommand.toString());
+
+    LOG.info("Command to launch container for ApplicationMaster is : "
+        + mergedCommand);
+
+    appContext.command = vargsFinal;
+    // TODO: RM should get this from RPC.
+    appContext.user = UserGroupInformation.getCurrentUser().getShortUserName();
+    return appContext;
+  }
+  
+  /**
+   * TODO: Copied for now from TaskAttemptImpl.java ... fixme
+   */
+  private void setupDistributedCache(Configuration conf, 
+      ApplicationSubmissionContext container) throws IOException {
+    
+    // Cache archives
+    parseDistributedCacheArtifacts(container, LocalResourceType.ARCHIVE, 
+        DistributedCache.getCacheArchives(conf), 
+        DistributedCache.getArchiveTimestamps(conf), 
+        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
+        DistributedCache.getArchiveVisibilities(conf), 
+        DistributedCache.getArchiveClassPaths(conf));
+    
+    // Cache files
+    parseDistributedCacheArtifacts(container, LocalResourceType.FILE, 
+        DistributedCache.getCacheFiles(conf),
+        DistributedCache.getFileTimestamps(conf),
+        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
+        DistributedCache.getFileVisibilities(conf),
+        DistributedCache.getFileClassPaths(conf));
+  }
+
+  // TODO - Move this to MR!
+  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType)
+  private static void parseDistributedCacheArtifacts(
+      ApplicationSubmissionContext container, LocalResourceType type,
+      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
+      Path[] classpaths) throws IOException {
+
+    if (uris != null) {
+      // Sanity check
+      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
+          (uris.length != visibilities.length)) {
+        throw new IllegalArgumentException("Invalid specification for " +
+            "distributed-cache artifacts of type " + type + " :" +
+            " #uris=" + uris.length +
+            " #timestamps=" + timestamps.length +
+            " #visibilities=" + visibilities.length
+            );
+      }
+      
+      Map<String, Path> classPaths = new HashMap<String, Path>();
+      if (classpaths != null) {
+        for (Path p : classpaths) {
+          classPaths.put(p.toUri().getPath().toString(), p);
+        }
+      }
+      for (int i = 0; i < uris.length; ++i) {
+        URI u = uris[i];
+        Path p = new Path(u.toString());
+        // Add URI fragment or just the filename
+        Path name = new Path((null == u.getFragment())
+          ? p.getName()
+          : u.getFragment());
+        if (name.isAbsolute()) {
+          throw new IllegalArgumentException("Resource name must be relative");
+        }
+        container.resources_todo.put(
+            name.toUri().getPath(),
+            getLocalResource(
+                uris[i], type, 
+                visibilities[i]
+                  ? LocalResourceVisibility.PUBLIC
+                  : LocalResourceVisibility.PRIVATE,
+                sizes[i], timestamps[i])
+        );
+        if (classPaths.containsKey(u.getPath())) {
+          MRApps.addToClassPath(container.environment, name.toUri().getPath());
+        }
+      }
+    }
+  }
+
+  // TODO - Move this to MR!
+  private static long[] getFileSizes(Configuration conf, String key) {
+    String[] strs = conf.getStrings(key);
+    if (strs == null) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+  
+  private static LocalResource getLocalResource(URI uri, 
+      LocalResourceType type, LocalResourceVisibility visibility, 
+      long size, long timestamp) {
+    LocalResource resource = new LocalResource();
+    resource.resource = AvroUtil.getYarnUrlFromURI(uri);
+    resource.type = type;
+    resource.state = visibility;
+    resource.size = size;
+    resource.timestamp = timestamp;
+    return resource;
+  }
+  
+  @Override
+  public void setJobPriority(JobID arg0, String arg1) throws IOException,
+      InterruptedException {
+    resMgrDelegate.setJobPriority(arg0, arg1);
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return resMgrDelegate.getProtocolVersion(arg0, arg1);
+  }
+  
+  @Override
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+      throws IOException, InterruptedException {
+    return resMgrDelegate.renewDelegationToken(arg0);
+  }
+
+  
+  @Override
+  public Counters getJobCounters(JobID arg0) throws IOException,
+      InterruptedException {
+    return clientServiceDelegate.getJobCounters(arg0);
+  }
+
+  @Override
+  public String getJobHistoryDir() throws IOException, InterruptedException {
+    return clientServiceDelegate.getJobHistoryDir();
+  }
+
+  @Override
+  public JobStatus getJobStatus(JobID jobID) throws IOException,
+      InterruptedException {
+    JobStatus status = clientServiceDelegate.getJobStatus(jobID);
+    if (status.isJobComplete()) {
+      // Clean up the Container running the ApplicationMaster.
+    }
+    return status;
+  }
+  
+  @Override
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+      int arg2) throws IOException, InterruptedException {
+    return clientServiceDelegate.getTaskCompletionEvents(arg0, arg1, arg2);
+  }
+
+  @Override
+  public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
+      InterruptedException {
+    return clientServiceDelegate.getTaskDiagnostics(arg0);
+  }
+
+  @Override
+  public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+  throws IOException, InterruptedException {
+    return clientServiceDelegate
+        .getTaskReports(jobID, taskType);
+  }
+
+  @Override
+  public void killJob(JobID arg0) throws IOException, InterruptedException {
+    clientServiceDelegate.killJob(arg0);
+  }
+
+  @Override
+  public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
+      InterruptedException {
+    return clientServiceDelegate.killTask(arg0, arg1);
+  }
+
+  @Override
+  public AccessControlList getQueueAdmins(String arg0) throws IOException {
+    return new AccessControlList("*");
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.ClientFactory;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+
+public class YarnClientFactory extends ClientFactory {
+
+    @Override
+    protected ClientProtocol createClient(Configuration conf)
+        throws IOException {
+      return new YARNRunner(conf);
+    }
+  }

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,283 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.YARNRunner;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.ClientRMProtocol;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.junit.Test;
+
+public class TestClientRedirect {
+
+  private static final Log LOG = LogFactory.getLog(TestClientRedirect.class);
+  private static final String RMADDRESS = "0.0.0.0:8054";
+  private static final String AMHOSTNAME = "0.0.0.0";
+  private static final int AMPORT = 10020;
+  private boolean firstRedirect = false; 
+  private boolean secondRedirect = false;
+ 
+  @Test
+  public void testRedirect() throws Exception {
+    
+    Configuration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.APPSMANAGER_ADDRESS, RMADDRESS);
+    conf.set("jobhistory.server.hostname", AMHOSTNAME);
+    conf.setInt("jobhistory.server.port", AMPORT);
+    RMService rmService = new RMService("test");
+    rmService.init(conf);
+    rmService.start();
+  
+    MRClientProtocolService clientService =
+      new MRClientProtocolService();
+    clientService.init(conf);
+    clientService.start(conf);
+  
+    LOG.info("services started");
+    YARNRunner yarnRunner = new YARNRunner(conf);
+    Throwable t = null;
+    org.apache.hadoop.mapreduce.JobID jobID =
+      new org.apache.hadoop.mapred.JobID("201103121733", 1);
+    yarnRunner.getJobCounters(jobID);
+    Assert.assertTrue(firstRedirect);
+    Assert.assertTrue(secondRedirect);
+    
+    rmService.stop();
+    clientService.stop();
+  }
+
+  class RMService extends AbstractService implements ClientRMProtocol {
+    private ApplicationsManager applicationsManager;
+    private String clientServiceBindAddress;
+    InetSocketAddress clientBindAddress;
+    private Server server;
+
+    public RMService(String name) {
+      super(name);
+    }
+
+    @Override
+    public void init(Configuration conf) {
+      clientServiceBindAddress = RMADDRESS;
+      /*
+      clientServiceBindAddress = conf.get(
+          YarnConfiguration.APPSMANAGER_ADDRESS,
+          YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS);
+          */
+      clientBindAddress = NetUtils.createSocketAddr(clientServiceBindAddress);
+      super.init(conf);
+    }
+
+    @Override
+    public void start() {
+      // All the clients to appsManager are supposed to be authenticated via
+      // Kerberos if security is enabled, so no secretManager.
+      YarnRPC rpc = YarnRPC.create(getConfig());
+      Configuration clientServerConf = new Configuration(getConfig());
+      this.server = rpc.getServer(ClientRMProtocol.class, this,
+          clientBindAddress, clientServerConf, null);
+      this.server.start();
+      super.start();
+    }
+
+    @Override
+    public ApplicationID getNewApplicationId() throws AvroRemoteException {
+      return null;
+    }
+
+    @Override
+    public ApplicationMaster getApplicationMaster(ApplicationID applicationId)
+        throws AvroRemoteException {
+      ApplicationMaster master = new ApplicationMaster();
+      master.applicationId = applicationId;
+      master.status = new ApplicationStatus();
+      master.status.applicationId = applicationId;
+      if (firstRedirect == false) {
+        master.state = ApplicationState.RUNNING;
+      } else {
+        master.state = ApplicationState.COMPLETED;
+      }
+      master.host = AMHOSTNAME;
+      master.rpcPort = AMPORT;
+      return master;
+  }
+
+    @Override
+    public Void submitApplication(ApplicationSubmissionContext context)
+        throws AvroRemoteException {
+      throw new AvroRemoteException("Test");
+    }
+
+    @Override
+    public Void finishApplication(ApplicationID applicationId)
+        throws AvroRemoteException {
+      return null;
+    }
+
+    @Override
+    public YarnClusterMetrics getClusterMetrics() throws AvroRemoteException {
+      return null;
+    }
+  }
+
+  class MRClientProtocolService extends AbstractService 
+      implements MRClientProtocol {
+    private InetSocketAddress bindAddress;
+    private Server server;
+    
+    public MRClientProtocolService() {
+      super("TestClientService");
+    }
+
+    public void start(Configuration conf) {
+      YarnRPC rpc = YarnRPC.create(conf);
+      //TODO : use fixed port ??
+      InetSocketAddress address = NetUtils.createSocketAddr(AMHOSTNAME + ":" + AMPORT);
+      InetAddress hostNameResolved = null;
+      try {
+        address.getAddress();
+        hostNameResolved = InetAddress.getLocalHost();
+      } catch (UnknownHostException e) {
+        throw new YarnException(e);
+      }
+
+      server =
+          rpc.getServer(MRClientProtocol.class, this, address,
+              conf, null);
+      server.start();
+      this.bindAddress =
+        NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
+            + ":" + server.getPort());
+       super.start();
+    }
+
+    public void stop() {
+      server.close();
+      super.stop();
+    }
+
+    @Override
+    public Counters getCounters(JobID jobID) throws AvroRemoteException,
+      YarnRemoteException {
+      if (firstRedirect == false) {
+        firstRedirect = true;
+        throw RPCUtil.getRemoteException(new IOException("Fail"));
+      }
+      else {
+        secondRedirect = true;
+        Counters counters = new Counters();
+        counters.groups = new HashMap<CharSequence, CounterGroup>();
+        return counters;
+      }
+ }
+
+    @Override
+    public List<CharSequence> getDiagnostics(
+        org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID)
+        throws AvroRemoteException, YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public JobReport getJobReport(JobID jobID) throws AvroRemoteException,
+        YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public List<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(
+        JobID jobID, int fromEventId, int maxEvents)
+        throws AvroRemoteException, YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public TaskAttemptReport getTaskAttemptReport(
+        org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID)
+        throws AvroRemoteException, YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public TaskReport getTaskReport(org.apache.hadoop.mapreduce.v2.api.TaskID taskID)
+        throws AvroRemoteException, YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public List<TaskReport> getTaskReports(JobID jobID,
+        org.apache.hadoop.mapreduce.v2.api.TaskType taskType)
+        throws AvroRemoteException, YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public Void killJob(JobID jobID) throws AvroRemoteException,
+        YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public Void killTask(org.apache.hadoop.mapreduce.v2.api.TaskID taskID)
+        throws AvroRemoteException, YarnRemoteException {
+      return null;
+    }
+
+    @Override
+    public Void killTaskAttempt(
+        org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID)
+        throws AvroRemoteException, YarnRemoteException {
+      return null;
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,111 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapred.YarnClientFactory;
+import org.apache.hadoop.mapreduce.ClientFactory;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
+
+/**
+ * Configures and starts the MR specific components in the YARN cluster.
+ *
+ */
+public class MiniMRYarnCluster extends MiniYARNCluster {
+
+  public static final String APPJAR =
+    "../hadoop-mapreduce-client-app/target/"
+        + YARNApplicationConstants.HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME;
+
+  private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class);
+  private JobHistoryServer historyServer;
+
+  public MiniMRYarnCluster(String testName) {
+    super(testName);
+    //TODO: add the history server
+    //historyServer = new JobHistoryServerWrapper();
+    //addService(historyServer);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    conf.setClass("mapreduce.clientfactory.class.name",
+        YarnClientFactory.class, ClientFactory.class);
+    conf.setStrings(YARNApplicationConstants.NM_HOSTS_CONF_KEY,
+        new String[] { NMConfig.DEFAULT_NM_BIND_ADDRESS });
+    conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
+    conf.set(YARNApplicationConstants.APPS_STAGING_DIR_KEY, new File(
+        getTestWorkDir(),
+        "apps_staging_dir/${user.name}/").getAbsolutePath());
+    conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
+                                             // which shuffle doesn't happen
+    
+    //configure the shuffle service in NM
+    conf.setStrings(AuxServices.AUX_SERVICES,
+        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+    conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT,
+        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+        Service.class);
+    super.init(conf);
+  }
+
+  private class JobHistoryServerWrapper extends AbstractService {
+    public JobHistoryServerWrapper() {
+      super(JobHistoryServerWrapper.class.getName());
+    }
+
+    @Override
+    public synchronized void start() {
+      try {
+        historyServer = new JobHistoryServer();
+        historyServer.init(getConfig());
+        new Thread() {
+          public void run() {
+            historyServer.start();
+          };
+        }.start();
+        while (historyServer.getServiceState() == STATE.INITED) {
+          LOG.info("Waiting for HistoryServer to start...");
+          Thread.sleep(1500);
+        }
+        if (historyServer.getServiceState() != STATE.STARTED) {
+          throw new IOException("HistoryServer failed to start");
+        }
+        super.start();
+      } catch (Throwable t) {
+        throw new YarnException(t);
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,220 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.FailingMapper;
+import org.apache.hadoop.RandomTextWriterJob;
+import org.apache.hadoop.SleepJob;
+import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRJobs {
+
+  private static final Log LOG = LogFactory.getLog(TestMRJobs.class);
+
+  private static MiniMRYarnCluster mrCluster;
+
+  @Before
+  public void setup() throws InterruptedException, IOException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+      return;
+    }
+
+    if (mrCluster == null) {
+      mrCluster = new MiniMRYarnCluster(getClass().getName());
+      mrCluster.init(new Configuration());
+      mrCluster.start();
+    }
+  }
+
+  @Test
+  public void testSleepJob() throws IOException, InterruptedException,
+      ClassNotFoundException { 
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+      return;
+    }
+
+    SleepJob sleepJob = new SleepJob();
+    sleepJob.setConf(mrCluster.getConfig());
+    //Job with 3 maps and 2 reduces
+    Job job = sleepJob.createJob(3, 2, 10000, 1, 5000, 1);
+    // TODO: We should not be setting MRAppJar as job.jar. It should be
+    // uploaded separately by YarnRunner.
+    job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+    job.waitForCompletion(true);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+  }
+
+  @Test
+  public void testRandomWriter() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+      return;
+    }
+
+    RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
+    mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
+    mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
+    Job job = randomWriterJob.createJob(mrCluster.getConfig());
+    FileOutputFormat.setOutputPath(job, new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+        "random-output"));
+    // TODO: We should not be setting MRAppJar as job.jar. It should be
+    // uploaded separately by YarnRunner.
+    job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+    job.waitForCompletion(true);
+    Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+  }
+
+  @Test
+  public void testFailingMapper() throws IOException, InterruptedException,
+      ClassNotFoundException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+      return;
+    }
+
+    int numMaps = 1;
+    mrCluster.getConfig().setInt(MRJobConfig.NUM_MAPS, numMaps);
+    
+    mrCluster.getConfig().setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout
+    mrCluster.getConfig().setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the no of attempts
+
+    Job job = new Job(mrCluster.getConfig());
+
+    job.setJarByClass(FailingMapper.class);
+    job.setJobName("failmapper");
+
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+
+    job.setInputFormatClass(RandomInputFormat.class);
+    job.setMapperClass(FailingMapper.class);
+
+    job.setOutputFormatClass(TextOutputFormat.class);
+    job.setNumReduceTasks(0);
+    
+    FileOutputFormat.setOutputPath(job, new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+        "failmapper-output"));
+    // TODO: We should not be setting MRAppJar as job.jar. It should be
+    // uploaded separately by YarnRunner.
+    job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+    job.waitForCompletion(true);
+    TaskID taskID = new TaskID(job.getJobID(), TaskType.MAP, 0);
+    TaskAttemptID aId = new TaskAttemptID(taskID, 0);
+    System.out.println("Diagnostics for " + aId + " :");
+    for (String diag : job.getTaskDiagnostics(aId)) {
+      System.out.println(diag);
+    }
+    aId = new TaskAttemptID(taskID, 1);
+    System.out.println("Diagnostics for " + aId + " :");
+    for (String diag : job.getTaskDiagnostics(aId)) {
+      System.out.println(diag);
+    }
+    
+    TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2);
+    Assert.assertEquals(TaskCompletionEvent.Status.FAILED, 
+        events[0].getStatus().FAILED);
+    Assert.assertEquals(TaskCompletionEvent.Status.FAILED, 
+        events[1].getStatus().FAILED);
+    Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+  }
+
+//@Test
+  public void testSleepJobWithSecurityOn() throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+      return;
+    }
+
+    mrCluster.getConfig().set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    mrCluster.getConfig().set(RMConfig.RM_KEYTAB, "/etc/krb5.keytab");
+    mrCluster.getConfig().set(NMConfig.NM_KEYTAB, "/etc/krb5.keytab");
+    mrCluster.getConfig().set(YarnConfiguration.RM_SERVER_PRINCIPAL_KEY,
+        "rm/sightbusy-lx@LOCALHOST");
+    mrCluster.getConfig().set(YarnServerConfig.NM_SERVER_PRINCIPAL_KEY,
+        "nm/sightbusy-lx@LOCALHOST");
+    UserGroupInformation.setConfiguration(mrCluster.getConfig());
+
+    // Keep it in here instead of after RM/NM as multiple user logins happen in
+    // the same JVM.
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+    LOG.info("User name is " + user.getUserName());
+    for (Token<? extends TokenIdentifier> str : user.getTokens()) {
+      LOG.info("Token is " + str.encodeToUrlString());
+    }
+    user.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {  
+        SleepJob sleepJob = new SleepJob();
+        sleepJob.setConf(mrCluster.getConfig());
+        Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
+        // //Job with reduces
+        // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
+        // TODO: We should not be setting MRAppJar as job.jar. It should be
+        // uploaded separately by YarnRunner.
+        job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+        job.waitForCompletion(true);
+        Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+        return null;
+      }
+    });
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,39 @@
+<?xml version="1.0"?><project>
+  <parent>
+    <artifactId>hadoop-mapreduce-client</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>${yarn.version}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+  <name>hadoop-mapreduce-client-shuffle</name>
+  <version>${yarn.version}</version>
+  <url>http://maven.apache.org</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn</artifactId>
+      <version>${yarn.version}</version>
+      <type>pom</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-server</artifactId>
+      <version>${yarn.version}</version>
+      <type>pom</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>yarn-server-nodemanager</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+  </dependencies>
+
+</project>

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,427 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.mapred.IndexCache;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelFutureProgressListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.DefaultFileRegion;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.FileRegion;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+import static org.jboss.netty.buffer.ChannelBuffers.*;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
+import static org.jboss.netty.handler.codec.http.HttpMethod.*;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.DataOutputByteBuffer;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+
+import org.apache.hadoop.yarn.ApplicationID;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+
+// DEBUG
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+// TODO packaging
+public class ShuffleHandler extends AbstractService 
+    implements AuxServices.AuxiliaryService {
+
+  private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
+  static {
+    //DEBUG
+    ((Log4JLogger)LOG).getLogger().setLevel(Level.DEBUG);
+  }
+
+  private int port;
+  private ChannelFactory selector;
+  private final ChannelGroup accepted = new DefaultChannelGroup();
+
+  public static final String MAPREDUCE_SHUFFLE_SERVICEID =
+      "mapreduce.shuffle";
+
+  private static final Map<String,String> userRsrc =
+    new ConcurrentHashMap<String,String>();
+  private static final JobTokenSecretManager secretManager =
+    new JobTokenSecretManager();
+
+  public static final String SHUFFLE_PORT = "mapreduce.shuffle.port";
+
+  public ShuffleHandler() {
+    super("httpshuffle");
+  }
+
+  @Override
+  public void initApp(String user, ApplicationID appId, ByteBuffer secret) {
+    // TODO these bytes should be versioned
+    try {
+      DataInputByteBuffer in = new DataInputByteBuffer();
+      in.reset(secret);
+      Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+      jt.readFields(in);
+      // TODO: Once SHuffle is out of NM, this can use MR APIs
+      JobID jobId = new JobID(Long.toString(appId.clusterTimeStamp), appId.id);
+      userRsrc.put(jobId.toString(), user);
+      LOG.info("Added token for " + jobId.toString());
+      secretManager.addTokenForJob(jobId.toString(), jt);
+    } catch (IOException e) {
+      LOG.error("Error during initApp", e);
+      // TODO add API to AuxiliaryServices to report failures
+    }
+  }
+
+  @Override
+  public void stopApp(ApplicationID appId) {
+    JobID jobId = new JobID(Long.toString(appId.clusterTimeStamp), appId.id);
+    secretManager.removeTokenForJob(jobId.toString());
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    selector = new NioServerSocketChannelFactory(
+        Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+    super.init(new Configuration(conf));
+  }
+
+  // TODO change AbstractService to throw InterruptedException
+  @Override
+  public synchronized void start() {
+    Configuration conf = getConfig();
+    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    bootstrap.setPipelineFactory(new HttpPipelineFactory(conf));
+    int port = conf.getInt("mapreduce.shuffle.port", 8080);
+    accepted.add(bootstrap.bind(new InetSocketAddress(port)));
+    LOG.info(getName() + " listening on port " + port);
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+    ServerBootstrap bootstrap = new ServerBootstrap(selector);
+    bootstrap.releaseExternalResources();
+    super.stop();
+  }
+
+  public static class HttpPipelineFactory implements ChannelPipelineFactory {
+
+    private final Shuffle SHUFFLE;
+
+    public HttpPipelineFactory(Configuration conf) {
+      SHUFFLE = new Shuffle(conf);
+    }
+
+    public ChannelPipeline getPipeline() throws Exception {
+        return Channels.pipeline(
+            new HttpRequestDecoder(),
+            new HttpChunkAggregator(1 << 16),
+            new HttpResponseEncoder(),
+            new ChunkedWriteHandler(),
+            SHUFFLE);
+        // TODO factor security manager into pipeline
+        // TODO factor out encode/decode to permit binary shuffle
+        // TODO factor out decode of index to permit alt. models
+    }
+
+  }
+
+  static class Shuffle extends SimpleChannelUpstreamHandler {
+
+    private final Configuration conf;
+    private final IndexCache indexCache;
+    private final LocalDirAllocator lDirAlloc =
+      new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
+
+    public Shuffle(Configuration conf) {
+      this.conf = conf;
+      indexCache = new IndexCache(new JobConf(conf));
+    }
+
+    private static List<String> splitMaps(List<String> mapq) {
+      if (null == mapq) {
+        return null;
+      }
+      final List<String> ret = new ArrayList<String>();
+      for (String s : mapq) {
+        Collections.addAll(ret, s.split(","));
+      }
+      return ret;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+        throws Exception {
+      HttpRequest request = (HttpRequest) evt.getMessage();
+      if (request.getMethod() != GET) {
+          sendError(ctx, METHOD_NOT_ALLOWED);
+          return;
+      }
+      final Map<String,List<String>> q =
+        new QueryStringDecoder(request.getUri()).getParameters();
+      final List<String> mapIds = splitMaps(q.get("map"));
+      final List<String> reduceQ = q.get("reduce");
+      final List<String> jobQ = q.get("job");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RECV: " + request.getUri() +
+            "\n  mapId: " + mapIds +
+            "\n  reduceId: " + reduceQ +
+            "\n  jobId: " + jobQ);
+      }
+
+      if (mapIds == null || reduceQ == null || jobQ == null) {
+        sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+        return;
+      }
+      if (reduceQ.size() != 1 || jobQ.size() != 1) {
+        sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
+        return;
+      }
+      int reduceId;
+      String jobId;
+      try {
+        reduceId = Integer.parseInt(reduceQ.get(0));
+        jobId = jobQ.get(0);
+      } catch (NumberFormatException e) {
+        sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
+        return;
+      } catch (IllegalArgumentException e) {
+        sendError(ctx, "Bad job parameter", BAD_REQUEST);
+        return;
+      }
+      final String reqUri = request.getUri();
+      if (null == reqUri) {
+        // TODO? add upstream?
+        sendError(ctx, FORBIDDEN);
+        return;
+      }
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+      try {
+        verifyRequest(jobId, ctx, request, response,
+            new URL("http", "", 8080, reqUri));
+      } catch (IOException e) {
+        LOG.warn("Shuffle failure ", e);
+        sendError(ctx, e.getMessage(), UNAUTHORIZED);
+        return;
+      }
+
+      Channel ch = evt.getChannel();
+      ch.write(response);
+      // TODO refactor the following into the pipeline
+      ChannelFuture lastMap = null;
+      for (String mapId : mapIds) {
+        try {
+          lastMap =
+            sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);
+          if (null == lastMap) {
+            sendError(ctx, NOT_FOUND);
+            return;
+          }
+        } catch (IOException e) {
+          LOG.error("Shuffle error ", e);
+          sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR);
+          return;
+        }
+      }
+      lastMap.addListener(ChannelFutureListener.CLOSE);
+    }
+
+    private void verifyRequest(String appid, ChannelHandlerContext ctx,
+        HttpRequest request, HttpResponse response, URL requestUri)
+        throws IOException {
+      SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
+      if (null == tokenSecret) {
+        LOG.info("Request for unknown token " + appid);
+        throw new IOException("could not find jobid");
+      }
+      // string to encrypt
+      String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
+      // hash from the fetcher
+      String urlHashStr =
+        request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+      if (urlHashStr == null) {
+        LOG.info("Missing header hash for " + appid);
+        throw new IOException("fetcher cannot be authenticated");
+      }
+      if (LOG.isDebugEnabled()) {
+        int len = urlHashStr.length();
+        LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." +
+            urlHashStr.substring(len-len/2, len-1));
+      }
+      // verify - throws exception
+      SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
+      // verification passed - encode the reply
+      String reply =
+        SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
+      response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+      if (LOG.isDebugEnabled()) {
+        int len = reply.length();
+        LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
+            reply.substring(len-len/2, len-1));
+      }
+    }
+
+    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
+        String user, String jobId, String mapId, int reduce)
+        throws IOException {
+      // TODO replace w/ rsrc alloc
+      // $x/$user/appcache/$appId/output/$mapId
+      // TODO: Once Shuffle is out of NM, this can use MR APIs to convert between App and Job
+      JobID jobID = JobID.forName(jobId);
+      ApplicationID appID = new ApplicationID();
+      appID.clusterTimeStamp = Long.parseLong(jobID.getJtIdentifier());
+      appID.id = jobID.getId();
+      final String base =
+          ApplicationLocalizer.USERCACHE + "/" + user + "/"
+              + ApplicationLocalizer.APPCACHE + "/"
+              + AvroUtil.toString(appID) + "/output" + "/" + mapId;
+      LOG.debug("DEBUG0 " + base);
+      // Index file
+      Path indexFileName = lDirAlloc.getLocalPathToRead(
+          base + "/file.out.index", conf);
+      // Map-output file
+      Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
+          base + "/file.out", conf);
+      LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " +
+          indexFileName);
+      IndexRecord info = 
+        indexCache.getIndexInformation(mapId, reduce, indexFileName);
+      final ShuffleHeader header =
+        new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
+      final DataOutputBuffer dob = new DataOutputBuffer();
+      header.write(dob);
+      ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+      File spillfile = new File(mapOutputFileName.toString());
+      RandomAccessFile spill;
+      try {
+        spill = new RandomAccessFile(spillfile, "r");
+      } catch (FileNotFoundException e) {
+        LOG.info(spillfile + " not found");
+        return null;
+      }
+      final FileRegion partition = new DefaultFileRegion(
+          spill.getChannel(), info.startOffset, info.partLength);
+      ChannelFuture writeFuture = ch.write(partition);
+      writeFuture.addListener(new ChannelFutureListener() {
+          // TODO error handling; distinguish IO/connection failures,
+          //      attribute to appropriate spill output
+          @Override
+          public void operationComplete(ChannelFuture future) {
+            partition.releaseExternalResources();
+          }
+        });
+      return writeFuture;
+    }
+
+    private void sendError(ChannelHandlerContext ctx,
+        HttpResponseStatus status) {
+      sendError(ctx, "", status);
+    }
+
+    private void sendError(ChannelHandlerContext ctx, String message,
+        HttpResponseStatus status) {
+      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+      response.setContent(
+        ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+
+      // Close the connection as soon as the error message is sent.
+      ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+    }
+
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      Channel ch = e.getChannel();
+      Throwable cause = e.getCause();
+      if (cause instanceof TooLongFrameException) {
+        sendError(ctx, BAD_REQUEST);
+        return;
+      }
+
+      cause.printStackTrace();
+      if (ch.isConnected()) {
+        LOG.error("Shuffle error " + e);
+        sendError(ctx, INTERNAL_SERVER_ERROR);
+      }
+    }
+
+  }
+}