You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 21:21:54 UTC
svn commit: r1082677 [15/38] - in /hadoop/mapreduce/branches/MR-279: ./
assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/
mr-client/hadoop-mapreduce-client-app/src/
mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,303 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.List;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
+import org.apache.hadoop.yarn.security.SchedulerSecurityInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+
+public class ClientServiceDelegate {
+ private static final Log LOG = LogFactory.getLog(ClientServiceDelegate.class);
+ private Configuration conf;
+ private ApplicationID appId;
+ private final ResourceMgrDelegate rm;
+ private MRClientProtocol realProxy = null;
+ private String serviceAddr = "";
+ private String serviceHttpAddr = "";
+
+ ClientServiceDelegate(Configuration conf, ResourceMgrDelegate rm,
+ ApplicationID appId) throws AvroRemoteException {
+ this.conf = conf;
+ this.rm = rm;
+ this.appId = appId;
+ if (appId != null) {
+ refreshProxy();
+ }
+ }
+
+ private void refreshProxy() throws AvroRemoteException {
+ ApplicationMaster appMaster = rm.getApplicationMaster(appId);
+ if (ApplicationState.COMPLETED.equals(appMaster.state)) {
+ serviceAddr = conf.get("jobhistory.server.hostname") + ":"
+ + conf.get("jobhistory.server.port");
+ LOG.debug("Reconnecting to job history server " + serviceAddr);
+ } else {
+ /* TODO check to confirm its really launched */
+ serviceAddr = appMaster.host + ":" + appMaster.rpcPort;
+ serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort;
+ }
+ try {
+ instantiateProxy(serviceAddr);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+ }
+
+ void instantiateProxy(ApplicationID applicationId, ApplicationMaster appMaster)
+ throws IOException {
+ try {
+ this.appId = applicationId;
+ LOG.info("Trying to connect to the ApplicationManager of"
+ + " application " + applicationId + " running at " + appMaster);
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ serviceAddr = appMaster.host + ":"
+ + appMaster.rpcPort;
+ serviceHttpAddr = appMaster.host + ":" + appMaster.httpPort;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String clientTokenEncoded = appMaster.clientToken.toString();
+ Token<ApplicationTokenIdentifier> clientToken = new Token<ApplicationTokenIdentifier>();
+ clientToken.decodeFromUrlString(clientTokenEncoded);
+ clientToken.setService(new Text(appMaster.host.toString() + ":"
+ + appMaster.rpcPort));
+ currentUser.addToken(clientToken);
+ }
+ instantiateProxy(serviceAddr);
+ LOG.info("Connection to the ApplicationManager established.");
+ } catch (IOException e) {
+ throw (new IOException(e));
+ }
+ }
+
+ private void instantiateProxy(final String serviceAddr) throws IOException {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ realProxy = currentUser.doAs(new PrivilegedAction<MRClientProtocol>() {
+ @Override
+ public MRClientProtocol run() {
+ Configuration myConf = new Configuration(conf);
+ myConf.setClass(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_INFO_CLASS_NAME,
+ SchedulerSecurityInfo.class, SecurityInfo.class);
+ YarnRPC rpc = YarnRPC.create(myConf);
+ return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
+ NetUtils.createSocketAddr(serviceAddr), myConf);
+ }
+ });
+ }
+
+ public org.apache.hadoop.mapreduce.Counters getJobCounters(JobID arg0) throws IOException,
+ InterruptedException {
+ appId = TypeConverter.toYarn(arg0).appID;
+ org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
+ if (realProxy == null) refreshProxy();
+ try {
+ return TypeConverter.fromYarn(realProxy.getCounters(jobID));
+ } catch(Exception e) {
+ LOG.debug("Failing to contact application master", e);
+ refreshProxy();
+ return TypeConverter.fromYarn(realProxy.getCounters(jobID));
+ }
+ }
+
+ public String getJobHistoryDir() throws IOException, InterruptedException {
+ //TODO fix this
+ return "";
+ }
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+ int arg2) throws IOException, InterruptedException {
+ appId = TypeConverter.toYarn(arg0).appID;
+ if (realProxy == null) refreshProxy();
+
+ org.apache.hadoop.mapreduce.v2.api.JobID jobID = TypeConverter.toYarn(arg0);
+ List<org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent> list = null;
+ try {
+ list = realProxy.getTaskAttemptCompletionEvents(jobID,
+ arg1, arg2);
+ } catch(Exception e) {
+ LOG.debug("Failed to contact application master ", e);
+ refreshProxy();
+ list = realProxy.getTaskAttemptCompletionEvents(jobID,
+ arg1, arg2);
+ }
+ return TypeConverter.fromYarn(
+ list.toArray(new org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[0]));
+ }
+
+ public String[] getTaskDiagnostics(org.apache.hadoop.mapreduce.TaskAttemptID
+ arg0)
+ throws IOException,
+ InterruptedException {
+
+ List<CharSequence> list = null;
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID = TypeConverter.toYarn(arg0);
+ appId = TypeConverter.toYarn(arg0.getJobID()).appID;
+ if (realProxy == null) refreshProxy();
+
+ try {
+ list = realProxy.getDiagnostics(attemptID);
+ } catch(Exception e) {
+ LOG.debug("Failed to contact application master ", e);
+ refreshProxy();
+ list = realProxy.getDiagnostics(attemptID);
+ }
+ String[] result = new String[list.size()];
+ int i = 0;
+ for (CharSequence c : list) {
+ result[i++] = c.toString();
+ }
+ return result;
+ }
+
+ //this method is here due to package restriction of
+ //TaskReport constructor
+ public static org.apache.hadoop.mapred.TaskReport[] fromYarn(
+ List<TaskReport> reports) {
+ org.apache.hadoop.mapred.TaskReport[] result =
+ new org.apache.hadoop.mapred.TaskReport[reports.size()];
+ int i = 0;
+ for (TaskReport report : reports) {
+ List<CharSequence> diag = report.diagnostics;
+ String[] diagnosticArr = new String[diag.size()];
+ int j = 0;
+ for (CharSequence c : diag) {
+ diagnosticArr[j++] = c.toString();
+ }
+ org.apache.hadoop.mapred.TaskReport oldReport =
+ new org.apache.hadoop.mapred.TaskReport(
+ TypeConverter.fromYarn(report.id), report.progress,
+ report.state.toString(),
+ diagnosticArr, TypeConverter.fromYarn(report.state),
+ report.startTime, report.finishTime,
+ new org.apache.hadoop.mapred.Counters(
+ TypeConverter.fromYarn(report.counters)));
+ result[i++] = oldReport;
+ }
+ return result;
+ }
+
+
+ public JobReport getJobReport(org.apache.hadoop.mapreduce.v2.api.JobID jobID)
+ throws AvroRemoteException, YarnRemoteException {
+ appId = jobID.appID;
+ if (realProxy == null) refreshProxy();
+
+ try {
+ return realProxy.getJobReport(jobID);
+ } catch (Exception e) {
+ refreshProxy();
+ return realProxy.getJobReport(jobID);
+ }
+ }
+
+ public JobStatus getJobStatus(org.apache.hadoop.mapreduce.v2.api.JobID jobId)
+ throws AvroRemoteException, YarnRemoteException {
+ appId = jobId.appID;
+ if (realProxy == null) refreshProxy();
+ String trackingUrl = serviceAddr;
+ String stagingDir = conf.get("yarn.apps.stagingDir");
+ String jobFile = stagingDir + "/" + jobId.toString();
+ return TypeConverter.fromYarn(getJobReport(jobId), jobFile, serviceHttpAddr);
+ }
+
+
+ public JobStatus getJobStatus(JobID jobID) throws YarnRemoteException,
+ AvroRemoteException {
+ return getJobStatus(TypeConverter.toYarn(jobID));
+ }
+
+ public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+ throws YarnRemoteException, AvroRemoteException {
+ List<TaskReport> taskReports = null;
+ org.apache.hadoop.mapreduce.v2.api.JobID nJobID = TypeConverter.toYarn(jobID);
+ appId = nJobID.appID;
+ if (realProxy == null) refreshProxy();
+
+ try {
+ taskReports = realProxy.getTaskReports(nJobID,
+ TypeConverter.toYarn(taskType));
+ } catch(Exception e) {
+ LOG.debug("Failed to contact application master ", e);
+ refreshProxy();
+ taskReports = realProxy.getTaskReports(nJobID,
+ TypeConverter.toYarn(taskType));
+ }
+ return (org.apache.hadoop.mapreduce.TaskReport[])TypeConverter.fromYarn
+ (taskReports).toArray();
+ }
+
+ public Void killJob(JobID jobID) throws YarnRemoteException,
+ AvroRemoteException {
+ org.apache.hadoop.mapreduce.v2.api.JobID nJobID = TypeConverter.toYarn(jobID);
+ appId = nJobID.appID;
+ if (realProxy == null) refreshProxy();
+
+ try {
+ realProxy.killJob(nJobID);
+ } catch(Exception e) {
+ LOG.debug("Failed to contact application master ", e);
+ refreshProxy();
+ realProxy.killJob(nJobID);
+ }
+ return null;
+ }
+
+ public boolean killTask(TaskAttemptID taskAttemptID, boolean killed)
+ throws YarnRemoteException, AvroRemoteException {
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID attemptID
+ = TypeConverter.toYarn(taskAttemptID);
+ appId = attemptID.taskID.jobID.appID;
+ if (realProxy == null) refreshProxy();
+
+ try {
+ realProxy.killTaskAttempt(attemptID);
+ } catch(Exception e) {
+ LOG.debug("Failed to contact application master ", e);
+ refreshProxy();
+ realProxy.killTaskAttempt(attemptID);
+ }
+ return true;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,226 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.ClientRMProtocol;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+
+// TODO: This should be part of something like yarn-client.
+public class ResourceMgrDelegate {
+ private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
+
+ private Configuration conf;
+ ClientRMProtocol applicationsManager;
+ private ApplicationID applicationId;
+
+ public ResourceMgrDelegate(Configuration conf) throws UnsupportedFileSystemException {
+ this.conf = conf;
+ YarnRPC rpc = YarnRPC.create(conf);
+ InetSocketAddress rmAddress =
+ NetUtils.createSocketAddr(conf.get(
+ YarnConfiguration.APPSMANAGER_ADDRESS,
+ YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS));
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
+ Configuration appsManagerServerConf = new Configuration(this.conf);
+ appsManagerServerConf.setClass(
+ CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ ClientRMSecurityInfo.class, SecurityInfo.class);
+ applicationsManager =
+ (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
+ rmAddress, appsManagerServerConf);
+ LOG.info("Connected to ResourceManager at " + rmAddress);
+ }
+
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ return;
+ }
+
+
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ return null;
+ }
+
+
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return null;
+ }
+
+
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ throw new IOException("Not implemented");
+ }
+
+
+ public QueueInfo[] getChildQueues(String arg0) throws IOException,
+ InterruptedException {
+ throw new IOException("Not implemented");
+ }
+
+
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ YarnClusterMetrics metrics = applicationsManager.getClusterMetrics();
+ ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1,
+ metrics.numNodeManagers * 10, metrics.numNodeManagers * 2, 1,
+ metrics.numNodeManagers, 0, 0);
+ return oldMetrics;
+ }
+
+
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text arg0)
+ throws IOException, InterruptedException {
+ throw new IOException("Not Implemented");
+ }
+
+
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return FileSystem.get(conf).getUri().toString();
+ }
+
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ applicationId = applicationsManager.getNewApplicationId();
+ return TypeConverter.fromYarn(applicationId);
+ }
+
+
+ public QueueInfo getQueue(String arg0) throws IOException,
+ InterruptedException {
+ throw new IOException("Not implemented");
+ }
+
+
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ throw new IOException("Not implemented");
+ }
+
+
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ throw new IOException("Not implemented");
+ }
+
+
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ throw new IOException("Not Implemented");
+ }
+
+
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+// Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
+ Path path = new Path(conf.get(YARNApplicationConstants.APPS_STAGING_DIR_KEY));
+ LOG.info("DEBUG --- getStagingAreaDir: dir=" + path);
+ return path.toString();
+ }
+
+
+ public String getSystemDir() throws IOException, InterruptedException {
+ Path sysDir = new Path(
+ YARNApplicationConstants.JOB_SUBMIT_DIR);
+ FileContext.getFileContext(conf).delete(sysDir, true);
+ return sysDir.toString();
+ }
+
+
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return 0;
+ }
+
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ return;
+ }
+
+
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return 0;
+ }
+
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ throw new IOException("Not implemented");
+ }
+
+
+ public ApplicationID submitApplication(ApplicationSubmissionContext appContext)
+ throws IOException {
+ appContext.applicationId = applicationId;
+ applicationsManager.submitApplication(appContext);
+ LOG.info("Submitted application " + applicationId + " to ResourceManager");
+ return applicationId;
+ }
+
+ public ApplicationMaster getApplicationMaster(ApplicationID appId)
+ throws AvroRemoteException {
+ ApplicationMaster appMaster =
+ applicationsManager.getApplicationMaster(appId);
+ while (appMaster.state != ApplicationState.RUNNING &&
+ appMaster.state != ApplicationState.KILLED &&
+ appMaster.state != ApplicationState.FAILED &&
+ appMaster.state != ApplicationState.COMPLETED) {
+ appMaster = applicationsManager.getApplicationMaster(appId);
+ try {
+ LOG.info("Waiting for appMaster to start..");
+ Thread.sleep(2000);
+ } catch(InterruptedException ie) {
+ //DO NOTHING
+ }
+ }
+ return appMaster;
+ }
+
+ public ApplicationID getApplicationId() {
+ return applicationId;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,533 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceType;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+
+/**
+ * This class enables the current JobClient (0.22 hadoop) to run on YARN.
+ */
+public class YARNRunner implements ClientProtocol {
+
+ private static final Log LOG = LogFactory.getLog(YARNRunner.class);
+
+ public static final String YARN_AM_RESOURCE_KEY = "yarn.am.mapreduce.resource.mb";
+ private static final int DEFAULT_YARN_AM_RESOURCE = 1024;
+
+ private ResourceMgrDelegate resMgrDelegate;
+ private ClientServiceDelegate clientServiceDelegate;
+ private YarnConfiguration conf;
+
+ /**
+ * Yarn runner incapsulates the client interface of
+ * yarn
+ * @param conf the configuration object for the client
+ */
+ public YARNRunner(Configuration conf, ApplicationID appID)
+ throws AvroRemoteException {
+ this.conf = new YarnConfiguration(conf);
+ try {
+ this.resMgrDelegate = new ResourceMgrDelegate(conf);
+ this.clientServiceDelegate = new ClientServiceDelegate(conf,
+ resMgrDelegate, appID);
+ } catch (UnsupportedFileSystemException ufe) {
+ throw new RuntimeException("Error in instantiating YarnClient", ufe);
+ }
+ }
+
+ /**
+ * Yarn runner incapsulates the client interface of
+ * yarn
+ * @param conf the configuration object for the client
+ */
+ public YARNRunner(Configuration conf) throws AvroRemoteException {
+ this(conf, null);
+ }
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ resMgrDelegate.cancelDelegationToken(arg0);
+ }
+
+ @Override
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException,
+ InterruptedException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return resMgrDelegate.getAllJobs();
+ }
+
+ @Override
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getBlacklistedTrackers();
+ }
+
+ @Override
+ public QueueInfo[] getChildQueues(String arg0) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getChildQueues(arg0);
+ }
+
+ @Override
+ public ClusterMetrics getClusterMetrics() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getClusterMetrics();
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text arg0)
+ throws IOException, InterruptedException {
+ return resMgrDelegate.getDelegationToken(arg0);
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return resMgrDelegate.getFilesystemName();
+ }
+
+ @Override
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ return resMgrDelegate.getNewJobID();
+ }
+
+ @Override
+ public QueueInfo getQueue(String arg0) throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueue(arg0);
+ }
+
+ @Override
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getQueueAclsForCurrentUser();
+ }
+
+ @Override
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getQueues();
+ }
+
+ @Override
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return resMgrDelegate.getRootQueues();
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getStagingAreaDir();
+ }
+
+ @Override
+ public String getSystemDir() throws IOException, InterruptedException {
+ return resMgrDelegate.getSystemDir();
+ }
+
+ @Override
+ public long getTaskTrackerExpiryInterval() throws IOException,
+ InterruptedException {
+ return resMgrDelegate.getTaskTrackerExpiryInterval();
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+ throws IOException, InterruptedException{
+
+ // Upload only in security mode: TODO
+ Path applicationTokensFile =
+ new Path(jobSubmitDir, YarnConfiguration.APPLICATION_TOKENS_FILE);
+ try {
+ ts.writeTokenStorageFile(applicationTokensFile, conf);
+ } catch (IOException e) {
+ throw new YarnException(e);
+ }
+
+ // XXX Remove
+ Path submitJobDir = new Path(jobSubmitDir);
+ FileContext defaultFS = FileContext.getFileContext(conf);
+ Path submitJobFile =
+ defaultFS.makeQualified(JobSubmissionFiles.getJobConfPath(submitJobDir));
+ FSDataInputStream in = defaultFS.open(submitJobFile);
+ conf.addResource(in);
+ // ---
+
+ // Construct necessary information to start the MR AM
+ ApplicationSubmissionContext appContext =
+ getApplicationSubmissionContext(conf, jobSubmitDir, ts);
+ setupDistributedCache(conf, appContext);
+
+ // XXX Remove
+ in.close();
+ // ---
+
+ // Submit to ResourceManager
+ ApplicationID applicationId = resMgrDelegate.submitApplication(appContext);
+
+ ApplicationMaster appMaster =
+ resMgrDelegate.getApplicationMaster(applicationId);
+ if (appMaster.state == ApplicationState.FAILED || appMaster.state ==
+ ApplicationState.KILLED) {
+ throw new AvroRemoteException("failed to run job");
+ }
+ clientServiceDelegate.instantiateProxy(applicationId, appMaster);
+ return clientServiceDelegate.getJobStatus(jobId);
+ }
+
+ private LocalResource createApplicationResource(FileContext fs, Path p)
+ throws IOException {
+ LocalResource rsrc = new LocalResource();
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.resource = AvroUtil.getYarnUrlFromPath(rsrcStat.getPath());
+ rsrc.size = rsrcStat.getLen();
+ rsrc.timestamp = rsrcStat.getModificationTime();
+ rsrc.type = LocalResourceType.FILE;
+ rsrc.state = LocalResourceVisibility.APPLICATION;
+ return rsrc;
+ }
+
+ private ApplicationSubmissionContext getApplicationSubmissionContext(
+ Configuration jobConf,
+ String jobSubmitDir, Credentials ts) throws IOException {
+ ApplicationSubmissionContext appContext =
+ new ApplicationSubmissionContext();
+ ApplicationID applicationId = resMgrDelegate.getApplicationId();
+ appContext.applicationId = applicationId;
+ Resource capability = new Resource();
+ capability.memory =
+ conf.getInt(YARN_AM_RESOURCE_KEY, DEFAULT_YARN_AM_RESOURCE);
+ LOG.info("Master capability = " + capability);
+ appContext.masterCapability = capability;
+
+ FileContext defaultFS = FileContext.getFileContext(conf);
+ Path jobConfPath = new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE);
+
+ URL yarnUrlForJobSubmitDir =
+ AvroUtil.getYarnUrlFromPath(defaultFS.makeQualified(new Path(
+ jobSubmitDir)));
+ appContext.resources = new HashMap<CharSequence, URL>();
+ LOG.debug("Creating setup context, jobSubmitDir url is "
+ + yarnUrlForJobSubmitDir);
+
+ appContext.resources.put(YARNApplicationConstants.JOB_SUBMIT_DIR,
+ yarnUrlForJobSubmitDir);
+
+ appContext.resources_todo = new HashMap<CharSequence,LocalResource>();
+ appContext.resources_todo.put(YARNApplicationConstants.JOB_CONF_FILE,
+ createApplicationResource(defaultFS,
+ jobConfPath));
+ appContext.resources_todo.put(YARNApplicationConstants.JOB_JAR,
+ createApplicationResource(defaultFS,
+ new Path(jobSubmitDir, YARNApplicationConstants.JOB_JAR)));
+ // TODO gross hack
+ for (String s : new String[] { "job.split", "job.splitmetainfo",
+ YarnConfiguration.APPLICATION_TOKENS_FILE }) {
+ appContext.resources_todo.put(
+ YARNApplicationConstants.JOB_SUBMIT_DIR + "/" + s,
+ createApplicationResource(defaultFS,
+ new Path(jobSubmitDir, s)));
+ }
+
+ // TODO: Only if security is on.
+ List<CharSequence> fsTokens = new ArrayList<CharSequence>();
+ for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
+ fsTokens.add(token.encodeToUrlString());
+ }
+
+ // TODO - Remove this!
+ appContext.fsTokens = fsTokens;
+ DataOutputBuffer dob = new DataOutputBuffer();
+ ts.writeTokenStorageToStream(dob);
+ appContext.fsTokens_todo =
+ ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+ // Add queue information
+ appContext.queue =
+ jobConf.get(JobContext.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME);
+
+ // Add job name
+ appContext.applicationName = jobConf.get(JobContext.JOB_NAME, "N/A");
+
+ // Add the command line
+ String javaHome = "$JAVA_HOME";
+ Vector<CharSequence> vargs = new Vector<CharSequence>(8);
+ vargs.add(javaHome + "/bin/java");
+ vargs.add(conf.get(YARNApplicationConstants.MR_APPMASTER_COMMAND_OPTS,
+ "-Dhadoop.root.logger=DEBUG,console -Xmx1024m"));
+
+ // Add { job jar, MR app jar } to classpath.
+ appContext.environment = new HashMap<CharSequence, CharSequence>();
+ MRApps.setInitialClasspath(appContext.environment);
+ MRApps.addToClassPath(appContext.environment,
+ YARNApplicationConstants.JOB_JAR);
+ MRApps.addToClassPath(appContext.environment,
+ YARNApplicationConstants.YARN_MAPREDUCE_APP_JAR_PATH);
+ vargs.add("org.apache.hadoop.mapreduce.v2.app.MRAppMaster");
+ vargs.add(String.valueOf(applicationId.clusterTimeStamp));
+ vargs.add(String.valueOf(applicationId.id));
+ vargs.add("1>logs/stderr");
+ vargs.add("2>logs/stdout");
+
+ Vector<CharSequence> vargsFinal = new Vector<CharSequence>(8);
+ // Final commmand
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
+ }
+ vargsFinal.add("mkdir logs;" + mergedCommand.toString());
+
+ LOG.info("Command to launch container for ApplicationMaster is : "
+ + mergedCommand);
+
+ appContext.command = vargsFinal;
+ // TODO: RM should get this from RPC.
+ appContext.user = UserGroupInformation.getCurrentUser().getShortUserName();
+ return appContext;
+ }
+
+ /**
+ * TODO: Copied for now from TaskAttemptImpl.java ... fixme
+ */
+ private void setupDistributedCache(Configuration conf,
+ ApplicationSubmissionContext container) throws IOException {
+
+ // Cache archives
+ parseDistributedCacheArtifacts(container, LocalResourceType.ARCHIVE,
+ DistributedCache.getCacheArchives(conf),
+ DistributedCache.getArchiveTimestamps(conf),
+ getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES),
+ DistributedCache.getArchiveVisibilities(conf),
+ DistributedCache.getArchiveClassPaths(conf));
+
+ // Cache files
+ parseDistributedCacheArtifacts(container, LocalResourceType.FILE,
+ DistributedCache.getCacheFiles(conf),
+ DistributedCache.getFileTimestamps(conf),
+ getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
+ DistributedCache.getFileVisibilities(conf),
+ DistributedCache.getFileClassPaths(conf));
+ }
+
+ // TODO - Move this to MR!
+ // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], long[], boolean[], Path[], FileType)
+ private static void parseDistributedCacheArtifacts(
+ ApplicationSubmissionContext container, LocalResourceType type,
+ URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
+ Path[] classpaths) throws IOException {
+
+ if (uris != null) {
+ // Sanity check
+ if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
+ (uris.length != visibilities.length)) {
+ throw new IllegalArgumentException("Invalid specification for " +
+ "distributed-cache artifacts of type " + type + " :" +
+ " #uris=" + uris.length +
+ " #timestamps=" + timestamps.length +
+ " #visibilities=" + visibilities.length
+ );
+ }
+
+ Map<String, Path> classPaths = new HashMap<String, Path>();
+ if (classpaths != null) {
+ for (Path p : classpaths) {
+ classPaths.put(p.toUri().getPath().toString(), p);
+ }
+ }
+ for (int i = 0; i < uris.length; ++i) {
+ URI u = uris[i];
+ Path p = new Path(u.toString());
+ // Add URI fragment or just the filename
+ Path name = new Path((null == u.getFragment())
+ ? p.getName()
+ : u.getFragment());
+ if (name.isAbsolute()) {
+ throw new IllegalArgumentException("Resource name must be relative");
+ }
+ container.resources_todo.put(
+ name.toUri().getPath(),
+ getLocalResource(
+ uris[i], type,
+ visibilities[i]
+ ? LocalResourceVisibility.PUBLIC
+ : LocalResourceVisibility.PRIVATE,
+ sizes[i], timestamps[i])
+ );
+ if (classPaths.containsKey(u.getPath())) {
+ MRApps.addToClassPath(container.environment, name.toUri().getPath());
+ }
+ }
+ }
+ }
+
+ // TODO - Move this to MR!
+ private static long[] getFileSizes(Configuration conf, String key) {
+ String[] strs = conf.getStrings(key);
+ if (strs == null) {
+ return null;
+ }
+ long[] result = new long[strs.length];
+ for(int i=0; i < strs.length; ++i) {
+ result[i] = Long.parseLong(strs[i]);
+ }
+ return result;
+ }
+
+ private static LocalResource getLocalResource(URI uri,
+ LocalResourceType type, LocalResourceVisibility visibility,
+ long size, long timestamp) {
+ LocalResource resource = new LocalResource();
+ resource.resource = AvroUtil.getYarnUrlFromURI(uri);
+ resource.type = type;
+ resource.state = visibility;
+ resource.size = size;
+ resource.timestamp = timestamp;
+ return resource;
+ }
+
+ @Override
+ public void setJobPriority(JobID arg0, String arg1) throws IOException,
+ InterruptedException {
+ resMgrDelegate.setJobPriority(arg0, arg1);
+ }
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return resMgrDelegate.getProtocolVersion(arg0, arg1);
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> arg0)
+ throws IOException, InterruptedException {
+ return resMgrDelegate.renewDelegationToken(arg0);
+ }
+
+
+ @Override
+ public Counters getJobCounters(JobID arg0) throws IOException,
+ InterruptedException {
+ return clientServiceDelegate.getJobCounters(arg0);
+ }
+
+ @Override
+ public String getJobHistoryDir() throws IOException, InterruptedException {
+ return clientServiceDelegate.getJobHistoryDir();
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobID jobID) throws IOException,
+ InterruptedException {
+ JobStatus status = clientServiceDelegate.getJobStatus(jobID);
+ if (status.isJobComplete()) {
+ // Clean up the Container running the ApplicationMaster.
+ }
+ return status;
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1,
+ int arg2) throws IOException, InterruptedException {
+ return clientServiceDelegate.getTaskCompletionEvents(arg0, arg1, arg2);
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException,
+ InterruptedException {
+ return clientServiceDelegate.getTaskDiagnostics(arg0);
+ }
+
+ @Override
+ public TaskReport[] getTaskReports(JobID jobID, TaskType taskType)
+ throws IOException, InterruptedException {
+ return clientServiceDelegate
+ .getTaskReports(jobID, taskType);
+ }
+
+ @Override
+ public void killJob(JobID arg0) throws IOException, InterruptedException {
+ clientServiceDelegate.killJob(arg0);
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException,
+ InterruptedException {
+ return clientServiceDelegate.killTask(arg0, arg1);
+ }
+
+ @Override
+ public AccessControlList getQueueAdmins(String arg0) throws IOException {
+ return new AccessControlList("*");
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YarnClientFactory.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,34 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.ClientFactory;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+
+public class YarnClientFactory extends ClientFactory {
+
+ @Override
+ protected ClientProtocol createClient(Configuration conf)
+ throws IOException {
+ return new YARNRunner(conf);
+ }
+ }
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientRedirect.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,283 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.YARNRunner;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.ApplicationsManager;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationState;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.ClientRMProtocol;
+import org.apache.hadoop.yarn.YarnClusterMetrics;
+import org.apache.hadoop.yarn.YarnRemoteException;
+import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.junit.Test;
+
+public class TestClientRedirect {
+
+ private static final Log LOG = LogFactory.getLog(TestClientRedirect.class);
+ private static final String RMADDRESS = "0.0.0.0:8054";
+ private static final String AMHOSTNAME = "0.0.0.0";
+ private static final int AMPORT = 10020;
+ private boolean firstRedirect = false;
+ private boolean secondRedirect = false;
+
+ @Test
+ public void testRedirect() throws Exception {
+
+ Configuration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.APPSMANAGER_ADDRESS, RMADDRESS);
+ conf.set("jobhistory.server.hostname", AMHOSTNAME);
+ conf.setInt("jobhistory.server.port", AMPORT);
+ RMService rmService = new RMService("test");
+ rmService.init(conf);
+ rmService.start();
+
+ MRClientProtocolService clientService =
+ new MRClientProtocolService();
+ clientService.init(conf);
+ clientService.start(conf);
+
+ LOG.info("services started");
+ YARNRunner yarnRunner = new YARNRunner(conf);
+ Throwable t = null;
+ org.apache.hadoop.mapreduce.JobID jobID =
+ new org.apache.hadoop.mapred.JobID("201103121733", 1);
+ yarnRunner.getJobCounters(jobID);
+ Assert.assertTrue(firstRedirect);
+ Assert.assertTrue(secondRedirect);
+
+ rmService.stop();
+ clientService.stop();
+ }
+
+ class RMService extends AbstractService implements ClientRMProtocol {
+ private ApplicationsManager applicationsManager;
+ private String clientServiceBindAddress;
+ InetSocketAddress clientBindAddress;
+ private Server server;
+
+ public RMService(String name) {
+ super(name);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ clientServiceBindAddress = RMADDRESS;
+ /*
+ clientServiceBindAddress = conf.get(
+ YarnConfiguration.APPSMANAGER_ADDRESS,
+ YarnConfiguration.DEFAULT_APPSMANAGER_BIND_ADDRESS);
+ */
+ clientBindAddress = NetUtils.createSocketAddr(clientServiceBindAddress);
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ // All the clients to appsManager are supposed to be authenticated via
+ // Kerberos if security is enabled, so no secretManager.
+ YarnRPC rpc = YarnRPC.create(getConfig());
+ Configuration clientServerConf = new Configuration(getConfig());
+ this.server = rpc.getServer(ClientRMProtocol.class, this,
+ clientBindAddress, clientServerConf, null);
+ this.server.start();
+ super.start();
+ }
+
+ @Override
+ public ApplicationID getNewApplicationId() throws AvroRemoteException {
+ return null;
+ }
+
+ @Override
+ public ApplicationMaster getApplicationMaster(ApplicationID applicationId)
+ throws AvroRemoteException {
+ ApplicationMaster master = new ApplicationMaster();
+ master.applicationId = applicationId;
+ master.status = new ApplicationStatus();
+ master.status.applicationId = applicationId;
+ if (firstRedirect == false) {
+ master.state = ApplicationState.RUNNING;
+ } else {
+ master.state = ApplicationState.COMPLETED;
+ }
+ master.host = AMHOSTNAME;
+ master.rpcPort = AMPORT;
+ return master;
+ }
+
+ @Override
+ public Void submitApplication(ApplicationSubmissionContext context)
+ throws AvroRemoteException {
+ throw new AvroRemoteException("Test");
+ }
+
+ @Override
+ public Void finishApplication(ApplicationID applicationId)
+ throws AvroRemoteException {
+ return null;
+ }
+
+ @Override
+ public YarnClusterMetrics getClusterMetrics() throws AvroRemoteException {
+ return null;
+ }
+ }
+
+ class MRClientProtocolService extends AbstractService
+ implements MRClientProtocol {
+ private InetSocketAddress bindAddress;
+ private Server server;
+
+ public MRClientProtocolService() {
+ super("TestClientService");
+ }
+
+ public void start(Configuration conf) {
+ YarnRPC rpc = YarnRPC.create(conf);
+ //TODO : use fixed port ??
+ InetSocketAddress address = NetUtils.createSocketAddr(AMHOSTNAME + ":" + AMPORT);
+ InetAddress hostNameResolved = null;
+ try {
+ address.getAddress();
+ hostNameResolved = InetAddress.getLocalHost();
+ } catch (UnknownHostException e) {
+ throw new YarnException(e);
+ }
+
+ server =
+ rpc.getServer(MRClientProtocol.class, this, address,
+ conf, null);
+ server.start();
+ this.bindAddress =
+ NetUtils.createSocketAddr(hostNameResolved.getHostAddress()
+ + ":" + server.getPort());
+ super.start();
+ }
+
+ public void stop() {
+ server.close();
+ super.stop();
+ }
+
+ @Override
+ public Counters getCounters(JobID jobID) throws AvroRemoteException,
+ YarnRemoteException {
+ if (firstRedirect == false) {
+ firstRedirect = true;
+ throw RPCUtil.getRemoteException(new IOException("Fail"));
+ }
+ else {
+ secondRedirect = true;
+ Counters counters = new Counters();
+ counters.groups = new HashMap<CharSequence, CounterGroup>();
+ return counters;
+ }
+ }
+
+ @Override
+ public List<CharSequence> getDiagnostics(
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID)
+ throws AvroRemoteException, YarnRemoteException {
+ return null;
+ }
+
+ @Override
+ public JobReport getJobReport(JobID jobID) throws AvroRemoteException,
+ YarnRemoteException {
+ return null;
+ }
+
+ @Override
+ public List<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(
+ JobID jobID, int fromEventId, int maxEvents)
+ throws AvroRemoteException, YarnRemoteException {
+ return null;
+ }
+
+ @Override
+ public TaskAttemptReport getTaskAttemptReport(
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID)
+ throws AvroRemoteException, YarnRemoteException {
+ return null;
+ }
+
+ @Override
+ public TaskReport getTaskReport(org.apache.hadoop.mapreduce.v2.api.TaskID taskID)
+ throws AvroRemoteException, YarnRemoteException {
+ return null;
+ }
+
+ @Override
+ public List<TaskReport> getTaskReports(JobID jobID,
+ org.apache.hadoop.mapreduce.v2.api.TaskType taskType)
+ throws AvroRemoteException, YarnRemoteException {
+ return null;
+ }
+
+ @Override
+ public Void killJob(JobID jobID) throws AvroRemoteException,
+ YarnRemoteException {
+ return null;
+ }
+
+ @Override
+ public Void killTask(org.apache.hadoop.mapreduce.v2.api.TaskID taskID)
+ throws AvroRemoteException, YarnRemoteException {
+ return null;
+ }
+
+ @Override
+ public Void killTaskAttempt(
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptID taskAttemptID)
+ throws AvroRemoteException, YarnRemoteException {
+ return null;
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,111 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapred.YarnClientFactory;
+import org.apache.hadoop.mapreduce.ClientFactory;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
+
+/**
+ * Configures and starts the MR specific components in the YARN cluster.
+ *
+ */
+public class MiniMRYarnCluster extends MiniYARNCluster {
+
+ public static final String APPJAR =
+ "../hadoop-mapreduce-client-app/target/"
+ + YARNApplicationConstants.HADOOP_MAPREDUCE_CLIENT_APP_JAR_NAME;
+
+ private static final Log LOG = LogFactory.getLog(MiniMRYarnCluster.class);
+ private JobHistoryServer historyServer;
+
+ public MiniMRYarnCluster(String testName) {
+ super(testName);
+ //TODO: add the history server
+ //historyServer = new JobHistoryServerWrapper();
+ //addService(historyServer);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ conf.setClass("mapreduce.clientfactory.class.name",
+ YarnClientFactory.class, ClientFactory.class);
+ conf.setStrings(YARNApplicationConstants.NM_HOSTS_CONF_KEY,
+ new String[] { NMConfig.DEFAULT_NM_BIND_ADDRESS });
+ conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
+ conf.set(YARNApplicationConstants.APPS_STAGING_DIR_KEY, new File(
+ getTestWorkDir(),
+ "apps_staging_dir/${user.name}/").getAbsolutePath());
+ conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
+ // which shuffle doesn't happen
+
+ //configure the shuffle service in NM
+ conf.setStrings(AuxServices.AUX_SERVICES,
+ new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+ conf.setClass(String.format(AuxServices.AUX_SERVICE_CLASS_FMT,
+ ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+ Service.class);
+ super.init(conf);
+ }
+
+ private class JobHistoryServerWrapper extends AbstractService {
+ public JobHistoryServerWrapper() {
+ super(JobHistoryServerWrapper.class.getName());
+ }
+
+ @Override
+ public synchronized void start() {
+ try {
+ historyServer = new JobHistoryServer();
+ historyServer.init(getConfig());
+ new Thread() {
+ public void run() {
+ historyServer.start();
+ };
+ }.start();
+ while (historyServer.getServiceState() == STATE.INITED) {
+ LOG.info("Waiting for HistoryServer to start...");
+ Thread.sleep(1500);
+ }
+ if (historyServer.getServiceState() != STATE.STARTED) {
+ throw new IOException("HistoryServer failed to start");
+ }
+ super.start();
+ } catch (Throwable t) {
+ throw new YarnException(t);
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,220 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.FailingMapper;
+import org.apache.hadoop.RandomTextWriterJob;
+import org.apache.hadoop.SleepJob;
+import org.apache.hadoop.RandomTextWriterJob.RandomInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.YarnServerConfig;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRJobs {
+
+ private static final Log LOG = LogFactory.getLog(TestMRJobs.class);
+
+ private static MiniMRYarnCluster mrCluster;
+
+ @Before
+ public void setup() throws InterruptedException, IOException {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+ return;
+ }
+
+ if (mrCluster == null) {
+ mrCluster = new MiniMRYarnCluster(getClass().getName());
+ mrCluster.init(new Configuration());
+ mrCluster.start();
+ }
+ }
+
+ @Test
+ public void testSleepJob() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+ return;
+ }
+
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(mrCluster.getConfig());
+ //Job with 3 maps and 2 reduces
+ Job job = sleepJob.createJob(3, 2, 10000, 1, 5000, 1);
+ // TODO: We should not be setting MRAppJar as job.jar. It should be
+ // uploaded separately by YarnRunner.
+ job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ job.waitForCompletion(true);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ }
+
+ @Test
+ public void testRandomWriter() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+ return;
+ }
+
+ RandomTextWriterJob randomWriterJob = new RandomTextWriterJob();
+ mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
+ mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
+ Job job = randomWriterJob.createJob(mrCluster.getConfig());
+ FileOutputFormat.setOutputPath(job, new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+ "random-output"));
+ // TODO: We should not be setting MRAppJar as job.jar. It should be
+ // uploaded separately by YarnRunner.
+ job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ job.waitForCompletion(true);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ }
+
+ @Test
+ public void testFailingMapper() throws IOException, InterruptedException,
+ ClassNotFoundException {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + " not found. Not running test.");
+ return;
+ }
+
+ int numMaps = 1;
+ mrCluster.getConfig().setInt(MRJobConfig.NUM_MAPS, numMaps);
+
+ mrCluster.getConfig().setInt("mapreduce.task.timeout", 10*1000);//reduce the timeout
+ mrCluster.getConfig().setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2); //reduce the no of attempts
+
+ Job job = new Job(mrCluster.getConfig());
+
+ job.setJarByClass(FailingMapper.class);
+ job.setJobName("failmapper");
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(RandomInputFormat.class);
+ job.setMapperClass(FailingMapper.class);
+
+ job.setOutputFormatClass(TextOutputFormat.class);
+ job.setNumReduceTasks(0);
+
+ FileOutputFormat.setOutputPath(job, new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
+ "failmapper-output"));
+ // TODO: We should not be setting MRAppJar as job.jar. It should be
+ // uploaded separately by YarnRunner.
+ job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ job.waitForCompletion(true);
+ TaskID taskID = new TaskID(job.getJobID(), TaskType.MAP, 0);
+ TaskAttemptID aId = new TaskAttemptID(taskID, 0);
+ System.out.println("Diagnostics for " + aId + " :");
+ for (String diag : job.getTaskDiagnostics(aId)) {
+ System.out.println(diag);
+ }
+ aId = new TaskAttemptID(taskID, 1);
+ System.out.println("Diagnostics for " + aId + " :");
+ for (String diag : job.getTaskDiagnostics(aId)) {
+ System.out.println(diag);
+ }
+
+ TaskCompletionEvent[] events = job.getTaskCompletionEvents(0, 2);
+ Assert.assertEquals(TaskCompletionEvent.Status.FAILED,
+ events[0].getStatus().FAILED);
+ Assert.assertEquals(TaskCompletionEvent.Status.FAILED,
+ events[1].getStatus().FAILED);
+ Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+ }
+
+//@Test
+ public void testSleepJobWithSecurityOn() throws IOException,
+ InterruptedException, ClassNotFoundException {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ return;
+ }
+
+ mrCluster.getConfig().set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ mrCluster.getConfig().set(RMConfig.RM_KEYTAB, "/etc/krb5.keytab");
+ mrCluster.getConfig().set(NMConfig.NM_KEYTAB, "/etc/krb5.keytab");
+ mrCluster.getConfig().set(YarnConfiguration.RM_SERVER_PRINCIPAL_KEY,
+ "rm/sightbusy-lx@LOCALHOST");
+ mrCluster.getConfig().set(YarnServerConfig.NM_SERVER_PRINCIPAL_KEY,
+ "nm/sightbusy-lx@LOCALHOST");
+ UserGroupInformation.setConfiguration(mrCluster.getConfig());
+
+ // Keep it in here instead of after RM/NM as multiple user logins happen in
+ // the same JVM.
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+ LOG.info("User name is " + user.getUserName());
+ for (Token<? extends TokenIdentifier> str : user.getTokens()) {
+ LOG.info("Token is " + str.encodeToUrlString());
+ }
+ user.doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ SleepJob sleepJob = new SleepJob();
+ sleepJob.setConf(mrCluster.getConfig());
+ Job job = sleepJob.createJob(3, 0, 10000, 1, 0, 0);
+ // //Job with reduces
+ // Job job = sleepJob.createJob(3, 2, 10000, 1, 10000, 1);
+ // TODO: We should not be setting MRAppJar as job.jar. It should be
+ // uploaded separately by YarnRunner.
+ job.setJar(new File(MiniMRYarnCluster.APPJAR).getAbsolutePath());
+ job.waitForCompletion(true);
+ Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
+ return null;
+ }
+ });
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,39 @@
+<?xml version="1.0"?><project>
+ <parent>
+ <artifactId>hadoop-mapreduce-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <version>${yarn.version}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+ <name>hadoop-mapreduce-client-shuffle</name>
+ <version>${yarn.version}</version>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn</artifactId>
+ <version>${yarn.version}</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-server</artifactId>
+ <version>${yarn.version}</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-server-nodemanager</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ </dependencies>
+
+</project>
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,427 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
+import java.net.URL;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.Map;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.mapred.IndexCache;
+
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelFutureProgressListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.DefaultFileRegion;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.FileRegion;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.jboss.netty.util.CharsetUtil;
+
+import static org.jboss.netty.buffer.ChannelBuffers.*;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.*;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
+import static org.jboss.netty.handler.codec.http.HttpMethod.*;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.DataOutputByteBuffer;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+
+import org.apache.hadoop.yarn.ApplicationID;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+
+// DEBUG
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+// TODO packaging
+public class ShuffleHandler extends AbstractService
+ implements AuxServices.AuxiliaryService {
+
+ private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
+ static {
+ //DEBUG
+ ((Log4JLogger)LOG).getLogger().setLevel(Level.DEBUG);
+ }
+
+ private int port;
+ private ChannelFactory selector;
+ private final ChannelGroup accepted = new DefaultChannelGroup();
+
+ public static final String MAPREDUCE_SHUFFLE_SERVICEID =
+ "mapreduce.shuffle";
+
+ private static final Map<String,String> userRsrc =
+ new ConcurrentHashMap<String,String>();
+ private static final JobTokenSecretManager secretManager =
+ new JobTokenSecretManager();
+
+ public static final String SHUFFLE_PORT = "mapreduce.shuffle.port";
+
+ public ShuffleHandler() {
+ super("httpshuffle");
+ }
+
+ @Override
+ public void initApp(String user, ApplicationID appId, ByteBuffer secret) {
+ // TODO these bytes should be versioned
+ try {
+ DataInputByteBuffer in = new DataInputByteBuffer();
+ in.reset(secret);
+ Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+ jt.readFields(in);
+ // TODO: Once SHuffle is out of NM, this can use MR APIs
+ JobID jobId = new JobID(Long.toString(appId.clusterTimeStamp), appId.id);
+ userRsrc.put(jobId.toString(), user);
+ LOG.info("Added token for " + jobId.toString());
+ secretManager.addTokenForJob(jobId.toString(), jt);
+ } catch (IOException e) {
+ LOG.error("Error during initApp", e);
+ // TODO add API to AuxiliaryServices to report failures
+ }
+ }
+
+ @Override
+ public void stopApp(ApplicationID appId) {
+ JobID jobId = new JobID(Long.toString(appId.clusterTimeStamp), appId.id);
+ secretManager.removeTokenForJob(jobId.toString());
+ }
+
+ @Override
+ public synchronized void init(Configuration conf) {
+ selector = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ super.init(new Configuration(conf));
+ }
+
+ // TODO change AbstractService to throw InterruptedException
+ @Override
+ public synchronized void start() {
+ Configuration conf = getConfig();
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ bootstrap.setPipelineFactory(new HttpPipelineFactory(conf));
+ int port = conf.getInt("mapreduce.shuffle.port", 8080);
+ accepted.add(bootstrap.bind(new InetSocketAddress(port)));
+ LOG.info(getName() + " listening on port " + port);
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ bootstrap.releaseExternalResources();
+ super.stop();
+ }
+
+ public static class HttpPipelineFactory implements ChannelPipelineFactory {
+
+ private final Shuffle SHUFFLE;
+
+ public HttpPipelineFactory(Configuration conf) {
+ SHUFFLE = new Shuffle(conf);
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ return Channels.pipeline(
+ new HttpRequestDecoder(),
+ new HttpChunkAggregator(1 << 16),
+ new HttpResponseEncoder(),
+ new ChunkedWriteHandler(),
+ SHUFFLE);
+ // TODO factor security manager into pipeline
+ // TODO factor out encode/decode to permit binary shuffle
+ // TODO factor out decode of index to permit alt. models
+ }
+
+ }
+
+ static class Shuffle extends SimpleChannelUpstreamHandler {
+
+ private final Configuration conf;
+ private final IndexCache indexCache;
+ private final LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(NMConfig.NM_LOCAL_DIR);
+
+ public Shuffle(Configuration conf) {
+ this.conf = conf;
+ indexCache = new IndexCache(new JobConf(conf));
+ }
+
+ private static List<String> splitMaps(List<String> mapq) {
+ if (null == mapq) {
+ return null;
+ }
+ final List<String> ret = new ArrayList<String>();
+ for (String s : mapq) {
+ Collections.addAll(ret, s.split(","));
+ }
+ return ret;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ throws Exception {
+ HttpRequest request = (HttpRequest) evt.getMessage();
+ if (request.getMethod() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+ final Map<String,List<String>> q =
+ new QueryStringDecoder(request.getUri()).getParameters();
+ final List<String> mapIds = splitMaps(q.get("map"));
+ final List<String> reduceQ = q.get("reduce");
+ final List<String> jobQ = q.get("job");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RECV: " + request.getUri() +
+ "\n mapId: " + mapIds +
+ "\n reduceId: " + reduceQ +
+ "\n jobId: " + jobQ);
+ }
+
+ if (mapIds == null || reduceQ == null || jobQ == null) {
+ sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+ return;
+ }
+ if (reduceQ.size() != 1 || jobQ.size() != 1) {
+ sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
+ return;
+ }
+ int reduceId;
+ String jobId;
+ try {
+ reduceId = Integer.parseInt(reduceQ.get(0));
+ jobId = jobQ.get(0);
+ } catch (NumberFormatException e) {
+ sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
+ return;
+ } catch (IllegalArgumentException e) {
+ sendError(ctx, "Bad job parameter", BAD_REQUEST);
+ return;
+ }
+ final String reqUri = request.getUri();
+ if (null == reqUri) {
+ // TODO? add upstream?
+ sendError(ctx, FORBIDDEN);
+ return;
+ }
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ try {
+ verifyRequest(jobId, ctx, request, response,
+ new URL("http", "", 8080, reqUri));
+ } catch (IOException e) {
+ LOG.warn("Shuffle failure ", e);
+ sendError(ctx, e.getMessage(), UNAUTHORIZED);
+ return;
+ }
+
+ Channel ch = evt.getChannel();
+ ch.write(response);
+ // TODO refactor the following into the pipeline
+ ChannelFuture lastMap = null;
+ for (String mapId : mapIds) {
+ try {
+ lastMap =
+ sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);
+ if (null == lastMap) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ } catch (IOException e) {
+ LOG.error("Shuffle error ", e);
+ sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR);
+ return;
+ }
+ }
+ lastMap.addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private void verifyRequest(String appid, ChannelHandlerContext ctx,
+ HttpRequest request, HttpResponse response, URL requestUri)
+ throws IOException {
+ SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
+ if (null == tokenSecret) {
+ LOG.info("Request for unknown token " + appid);
+ throw new IOException("could not find jobid");
+ }
+ // string to encrypt
+ String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
+ // hash from the fetcher
+ String urlHashStr =
+ request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+ if (urlHashStr == null) {
+ LOG.info("Missing header hash for " + appid);
+ throw new IOException("fetcher cannot be authenticated");
+ }
+ if (LOG.isDebugEnabled()) {
+ int len = urlHashStr.length();
+ LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." +
+ urlHashStr.substring(len-len/2, len-1));
+ }
+ // verify - throws exception
+ SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
+ // verification passed - encode the reply
+ String reply =
+ SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret);
+ response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+ if (LOG.isDebugEnabled()) {
+ int len = reply.length();
+ LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
+ reply.substring(len-len/2, len-1));
+ }
+ }
+
+ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
+ String user, String jobId, String mapId, int reduce)
+ throws IOException {
+ // TODO replace w/ rsrc alloc
+ // $x/$user/appcache/$appId/output/$mapId
+ // TODO: Once Shuffle is out of NM, this can use MR APIs to convert between App and Job
+ JobID jobID = JobID.forName(jobId);
+ ApplicationID appID = new ApplicationID();
+ appID.clusterTimeStamp = Long.parseLong(jobID.getJtIdentifier());
+ appID.id = jobID.getId();
+ final String base =
+ ApplicationLocalizer.USERCACHE + "/" + user + "/"
+ + ApplicationLocalizer.APPCACHE + "/"
+ + AvroUtil.toString(appID) + "/output" + "/" + mapId;
+ LOG.debug("DEBUG0 " + base);
+ // Index file
+ Path indexFileName = lDirAlloc.getLocalPathToRead(
+ base + "/file.out.index", conf);
+ // Map-output file
+ Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
+ base + "/file.out", conf);
+ LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : " +
+ indexFileName);
+ IndexRecord info =
+ indexCache.getIndexInformation(mapId, reduce, indexFileName);
+ final ShuffleHeader header =
+ new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
+ final DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ File spillfile = new File(mapOutputFileName.toString());
+ RandomAccessFile spill;
+ try {
+ spill = new RandomAccessFile(spillfile, "r");
+ } catch (FileNotFoundException e) {
+ LOG.info(spillfile + " not found");
+ return null;
+ }
+ final FileRegion partition = new DefaultFileRegion(
+ spill.getChannel(), info.startOffset, info.partLength);
+ ChannelFuture writeFuture = ch.write(partition);
+ writeFuture.addListener(new ChannelFutureListener() {
+ // TODO error handling; distinguish IO/connection failures,
+ // attribute to appropriate spill output
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ partition.releaseExternalResources();
+ }
+ });
+ return writeFuture;
+ }
+
+ private void sendError(ChannelHandlerContext ctx,
+ HttpResponseStatus status) {
+ sendError(ctx, "", status);
+ }
+
+ private void sendError(ChannelHandlerContext ctx, String message,
+ HttpResponseStatus status) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.setContent(
+ ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+
+ // Close the connection as soon as the error message is sent.
+ ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ Channel ch = e.getChannel();
+ Throwable cause = e.getCause();
+ if (cause instanceof TooLongFrameException) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
+
+ cause.printStackTrace();
+ if (ch.isConnected()) {
+ LOG.error("Shuffle error " + e);
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ }
+}