You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/04/01 00:23:34 UTC
svn commit: r1087462 [2/20] - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
mr-client/h...
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Thu Mar 31 22:23:22 2011
@@ -36,8 +36,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -67,8 +67,8 @@ public class JobHistoryEventHandler exte
private static final Log LOG = LogFactory.getLog(
JobHistoryEventHandler.class);
- private static final Map<JobID, MetaInfo> fileMap =
- Collections.<JobID,MetaInfo>synchronizedMap(new HashMap<JobID,MetaInfo>());
+ private static final Map<JobId, MetaInfo> fileMap =
+ Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
static final FsPermission HISTORY_DIR_PERMISSION =
FsPermission.createImmutable((short) 0750); // rwxr-x---
@@ -159,7 +159,7 @@ public class JobHistoryEventHandler exte
* @param jobId
* @throws IOException
*/
- protected void setupEventWriter(JobID jobId)
+ protected void setupEventWriter(JobId jobId)
throws IOException {
if (logDirPath == null) {
throw new IOException("Missing Log Directory for History");
@@ -195,7 +195,7 @@ public class JobHistoryEventHandler exte
/** Close the event writer for this id
* @throws IOException */
- public void closeWriter(JobID id) throws IOException {
+ public void closeWriter(JobId id) throws IOException {
try {
final MetaInfo mi = fileMap.get(id);
if (mi != null) {
@@ -254,7 +254,7 @@ public class JobHistoryEventHandler exte
}
}
- protected void closeEventWriter(JobID jobId) throws IOException {
+ protected void closeEventWriter(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
try {
Path logFile = mi.getHistoryFile();
@@ -318,7 +318,7 @@ public class JobHistoryEventHandler exte
/**
* Get the job history file path
*/
- public static Path getJobHistoryFile(Path dir, JobID jobId) {
+ public static Path getJobHistoryFile(Path dir, JobId jobId) {
return new Path(dir, TypeConverter.fromYarn(jobId).toString());
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java Thu Mar 31 22:23:22 2011
@@ -20,23 +20,24 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.Map;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
+
/**
* Context interface for sharing information across components in YARN App.
*/
public interface AppContext {
- ApplicationID getApplicationID();
+ ApplicationId getApplicationID();
CharSequence getUser();
- Job getJob(JobID jobID);
+ Job getJob(JobId jobID);
- Map<JobID, Job> getAllJobs();
+ Map<JobId, Job> getAllJobs();
EventHandler getEventHandler();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Mar 31 22:23:22 2011
@@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -64,13 +64,14 @@ import org.apache.hadoop.security.Creden
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.ApplicationID;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
@@ -98,7 +99,7 @@ public class MRAppMaster extends Composi
private final Clock clock;
- private ApplicationID appID;
+ private ApplicationId appID;
private AppContext context;
private Dispatcher dispatcher;
private ClientService clientService;
@@ -110,11 +111,11 @@ public class MRAppMaster extends Composi
private JobTokenSecretManager jobTokenSecretManager =
new JobTokenSecretManager();
- public MRAppMaster(ApplicationID applicationId) {
+ public MRAppMaster(ApplicationId applicationId) {
this(applicationId, null);
}
- public MRAppMaster(ApplicationID applicationId, Clock clock) {
+ public MRAppMaster(ApplicationId applicationId, Clock clock) {
super(MRAppMaster.class.getName());
if (clock == null) {
clock = new Clock();
@@ -259,7 +260,7 @@ public class MRAppMaster extends Composi
return new MRClientService(context);
}
- public ApplicationID getAppID() {
+ public ApplicationId getAppID() {
return appID;
}
@@ -290,20 +291,20 @@ public class MRAppMaster extends Composi
class RunningAppContext implements AppContext {
- private Map<JobID, Job> jobs = new ConcurrentHashMap<JobID, Job>();
+ private Map<JobId, Job> jobs = new ConcurrentHashMap<JobId, Job>();
@Override
- public ApplicationID getApplicationID() {
+ public ApplicationId getApplicationID() {
return appID;
}
@Override
- public Job getJob(JobID jobID) {
+ public Job getJob(JobId jobID) {
return jobs.get(jobID);
}
@Override
- public Map<JobID, Job> getAllJobs() {
+ public Map<JobId, Job> getAllJobs() {
return jobs;
}
@@ -414,7 +415,7 @@ public class MRAppMaster extends Composi
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@Override
public void handle(TaskEvent event) {
- Task task = context.getJob(event.getTaskID().jobID).getTask(
+ Task task = context.getJob(event.getTaskID().getJobId()).getTask(
event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
@@ -424,8 +425,8 @@ public class MRAppMaster extends Composi
implements EventHandler<TaskAttemptEvent> {
@Override
public void handle(TaskAttemptEvent event) {
- Job job = context.getJob(event.getTaskAttemptID().taskID.jobID);
- Task task = job.getTask(event.getTaskAttemptID().taskID);
+ Job job = context.getJob(event.getTaskAttemptID().getTaskId().getJobId());
+ Task task = job.getTask(event.getTaskAttemptID().getTaskId());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
@@ -434,9 +435,10 @@ public class MRAppMaster extends Composi
public static void main(String[] args) {
try {
//Configuration.addDefaultResource("job.xml");
- ApplicationID applicationId = new ApplicationID();
- applicationId.clusterTimeStamp = Long.valueOf(args[0]);
- applicationId.id = Integer.valueOf(args[1]);
+ ApplicationId applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class);
+
+ applicationId.setClusterTimestamp(Long.valueOf(args[0]));
+ applicationId.setId(Integer.valueOf(args[1]));
MRAppMaster appMaster = new MRAppMaster(applicationId);
YarnConfiguration conf = new YarnConfiguration(new JobConf());
conf.addResource(new Path(YARNApplicationConstants.JOB_CONF_FILE));
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java Thu Mar 31 22:23:22 2011
@@ -22,14 +22,14 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.WrappedJvmID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
public interface TaskAttemptListener {
InetSocketAddress getAddress();
- void register(TaskAttemptID attemptID, Task task, WrappedJvmID jvmID);
+ void register(TaskAttemptId attemptID, Task task, WrappedJvmID jvmID);
- void unregister(TaskAttemptID attemptID, WrappedJvmID jvmID);
+ void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID);
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java Thu Mar 31 22:23:22 2011
@@ -25,12 +25,13 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
/**
* This class keeps track of tasks that have already been launched. It
@@ -50,8 +51,8 @@ public class TaskHeartbeatHandler extend
private EventHandler eventHandler;
- private Map<TaskAttemptID, Long> runningAttempts
- = new HashMap<TaskAttemptID, Long>();
+ private Map<TaskAttemptId, Long> runningAttempts
+ = new HashMap<TaskAttemptId, Long>();
public TaskHeartbeatHandler(EventHandler eventHandler) {
super("TaskHeartbeatHandler");
@@ -78,18 +79,18 @@ public class TaskHeartbeatHandler extend
super.stop();
}
- public synchronized void receivedPing(TaskAttemptID attemptID) {
+ public synchronized void receivedPing(TaskAttemptId attemptID) {
//only put for the registered attempts
if (runningAttempts.containsKey(attemptID)) {
runningAttempts.put(attemptID, System.currentTimeMillis());
}
}
- public synchronized void register(TaskAttemptID attemptID) {
+ public synchronized void register(TaskAttemptId attemptID) {
runningAttempts.put(attemptID, System.currentTimeMillis());
}
- public synchronized void unregister(TaskAttemptID attemptID) {
+ public synchronized void unregister(TaskAttemptId attemptID) {
runningAttempts.remove(attemptID);
}
@@ -99,14 +100,14 @@ public class TaskHeartbeatHandler extend
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
synchronized (TaskHeartbeatHandler.this) {
- Iterator<Map.Entry<TaskAttemptID, Long>> iterator =
+ Iterator<Map.Entry<TaskAttemptId, Long>> iterator =
runningAttempts.entrySet().iterator();
//avoid calculating current time everytime in loop
long currentTime = System.currentTimeMillis();
while (iterator.hasNext()) {
- Map.Entry<TaskAttemptID, Long> entry = iterator.next();
+ Map.Entry<TaskAttemptId, Long> entry = iterator.next();
if (currentTime > entry.getValue() + taskTimeOut) {
//task is lost, remove from the list and raise lost event
iterator.remove();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Thu Mar 31 22:23:22 2011
@@ -21,28 +21,42 @@ package org.apache.hadoop.mapreduce.v2.a
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
-import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.ipc.Server;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -59,6 +73,9 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
@@ -151,7 +168,9 @@ public class MRClientService extends Abs
class MRClientProtocolHandler implements MRClientProtocol {
- private Job verifyAndGetJob(JobID jobID) throws AvroRemoteException {
+ private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ private Job verifyAndGetJob(JobId jobID) throws YarnRemoteException {
Job job = appContext.getJob(jobID);
if (job == null) {
throw RPCUtil.getRemoteException("Unknown job " + jobID);
@@ -159,17 +178,17 @@ public class MRClientService extends Abs
return job;
}
- private Task verifyAndGetTask(TaskID taskID) throws AvroRemoteException {
- Task task = verifyAndGetJob(taskID.jobID).getTask(taskID);
+ private Task verifyAndGetTask(TaskId taskID) throws YarnRemoteException {
+ Task task = verifyAndGetJob(taskID.getJobId()).getTask(taskID);
if (task == null) {
throw RPCUtil.getRemoteException("Unknown Task " + taskID);
}
return task;
}
- private TaskAttempt verifyAndGetAttempt(TaskAttemptID attemptID)
- throws AvroRemoteException {
- TaskAttempt attempt = verifyAndGetTask(attemptID.taskID).getAttempt(attemptID);
+ private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID)
+ throws YarnRemoteException {
+ TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId()).getAttempt(attemptID);
if (attempt == null) {
throw RPCUtil.getRemoteException("Unknown TaskAttempt " + attemptID);
}
@@ -177,94 +196,121 @@ public class MRClientService extends Abs
}
@Override
- public Counters getCounters(JobID jobID) throws AvroRemoteException {
- Job job = verifyAndGetJob(jobID);
- return job.getCounters();
- }
-
- @Override
- public JobReport getJobReport(JobID jobID) throws AvroRemoteException {
- Job job = verifyAndGetJob(jobID);
- return job.getReport();
+ public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
+ Job job = verifyAndGetJob(jobId);
+ GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
+ response.setCounters(job.getCounters());
+ return response;
+ }
+
+ @Override
+ public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
+ Job job = verifyAndGetJob(jobId);
+ GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
+ response.setJobReport(job.getReport());
+ return response;
+ }
+
+
+ @Override
+ public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException {
+ TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+ GetTaskAttemptReportResponse response = recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
+ response.setTaskAttemptReport(verifyAndGetAttempt(taskAttemptId).getReport());
+ return response;
}
@Override
- public TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID)
- throws AvroRemoteException {
- return verifyAndGetAttempt(taskAttemptID).getReport();
+ public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException {
+ TaskId taskId = request.getTaskId();
+ GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class);
+ response.setTaskReport(verifyAndGetTask(taskId).getReport());
+ return response;
}
@Override
- public TaskReport getTaskReport(TaskID taskID) throws AvroRemoteException {
- return verifyAndGetTask(taskID).getReport();
- }
-
- @Override
- public List<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(JobID jobID,
- int fromEventId, int maxEvents) throws AvroRemoteException {
- Job job = verifyAndGetJob(jobID);
- return Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents));
- }
-
- @Override
- public Void killJob(JobID jobID) throws AvroRemoteException {
- LOG.info("Kill Job received from client " + jobID);
- verifyAndGetJob(jobID);
+ public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
+ int fromEventId = request.getFromEventId();
+ int maxEvents = request.getMaxEvents();
+ Job job = verifyAndGetJob(jobId);
+
+ GetTaskAttemptCompletionEventsResponse response = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
+ response.addAllCompletionEvents(Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents)));
+ return response;
+ }
+
+ @Override
+ public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
+ LOG.info("Kill Job received from client " + jobId);
+ verifyAndGetJob(jobId);
appContext.getEventHandler().handle(
- new JobEvent(jobID, JobEventType.JOB_KILL));
- return null;
+ new JobEvent(jobId, JobEventType.JOB_KILL));
+ KillJobResponse response = recordFactory.newRecordInstance(KillJobResponse.class);
+ return response;
}
@Override
- public Void killTask(TaskID taskID) throws AvroRemoteException {
- LOG.info("Kill task received from client " + taskID);
- verifyAndGetTask(taskID);
+ public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException {
+ TaskId taskId = request.getTaskId();
+ LOG.info("Kill task received from client " + taskId);
+ verifyAndGetTask(taskId);
appContext.getEventHandler().handle(
- new TaskEvent(taskID, TaskEventType.T_KILL));
- return null;
+ new TaskEvent(taskId, TaskEventType.T_KILL));
+ KillTaskResponse response = recordFactory.newRecordInstance(KillTaskResponse.class);
+ return response;
+ }
+
+ @Override
+ public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException {
+ TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+ LOG.info("Kill task attempt received from client " + taskAttemptId);
+ verifyAndGetAttempt(taskAttemptId);
+ appContext.getEventHandler().handle(
+ new TaskAttemptEvent(taskAttemptId,
+ TaskAttemptEventType.TA_KILL));
+ KillTaskAttemptResponse response = recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
+ return response;
}
@Override
- public Void killTaskAttempt(TaskAttemptID taskAttemptID)
- throws AvroRemoteException {
- LOG.info("Kill task attempt received from client " + taskAttemptID);
- verifyAndGetAttempt(taskAttemptID);
- appContext.getEventHandler().handle(
- new TaskAttemptEvent(taskAttemptID,
- TaskAttemptEventType.TA_KILL));
- return null;
+ public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException {
+ TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+
+ GetDiagnosticsResponse response = recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
+ response.addAllDiagnostics(verifyAndGetAttempt(taskAttemptId).getDiagnostics());
+ return response;
}
@Override
- public Void failTaskAttempt(TaskAttemptID taskAttemptID)
- throws AvroRemoteException {
- LOG.info("Fail task attempt received from client " + taskAttemptID);
- verifyAndGetAttempt(taskAttemptID);
+ public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException {
+ TaskAttemptId taskAttemptId = request.getTaskAttemptId();
+ LOG.info("Fail task attempt received from client " + taskAttemptId);
+ verifyAndGetAttempt(taskAttemptId);
appContext.getEventHandler().handle(
- new TaskAttemptEvent(taskAttemptID,
+ new TaskAttemptEvent(taskAttemptId,
TaskAttemptEventType.TA_FAILMSG));
return null;
}
@Override
- public List<CharSequence> getDiagnostics(TaskAttemptID taskAttemptID)
- throws AvroRemoteException {
- return verifyAndGetAttempt(taskAttemptID).getDiagnostics();
- }
-
- @Override
- public List<TaskReport> getTaskReports(JobID jobID, TaskType taskType)
- throws AvroRemoteException {
- Job job = verifyAndGetJob(jobID);
- LOG.info("Getting task report for " + taskType + " " + jobID);
- List<TaskReport> reports = new ArrayList<TaskReport>();
+ public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request) throws YarnRemoteException {
+ JobId jobId = request.getJobId();
+ TaskType taskType = request.getTaskType();
+
+ GetTaskReportsResponse response = recordFactory.newRecordInstance(GetTaskReportsResponse.class);
+
+ Job job = verifyAndGetJob(jobId);
+ LOG.info("Getting task report for " + taskType + " " + jobId);
Collection<Task> tasks = job.getTasks(taskType).values();
LOG.info("Getting task report size " + tasks.size());
for (Task task : tasks) {
- reports.add(task.getReport());
- }
- return reports;
+ response.addTaskReport(task.getReport());
+ }
+ return response;
}
-
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Thu Mar 31 22:23:22 2011
@@ -21,27 +21,28 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.JobState;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+
/**
* Main interface to interact with the job. Provides only getters.
*/
public interface Job {
- JobID getID();
- CharSequence getName();
+ JobId getID();
+ String getName();
JobState getState();
JobReport getReport();
Counters getCounters();
- Map<TaskID,Task> getTasks();
- Map<TaskID,Task> getTasks(TaskType taskType);
- Task getTask(TaskID taskID);
+ Map<TaskId,Task> getTasks();
+ Map<TaskId,Task> getTasks(TaskType taskType);
+ Task getTask(TaskId taskID);
List<String> getDiagnostics();
int getTotalMaps();
int getTotalReduces();
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java Thu Mar 31 22:23:22 2011
@@ -20,25 +20,25 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.Map;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
/**
* Read only view of Task.
*/
public interface Task {
- TaskID getID();
+ TaskId getID();
TaskReport getReport();
TaskState getState();
Counters getCounters();
float getProgress();
TaskType getType();
- Map<TaskAttemptID, TaskAttempt> getAttempts();
- TaskAttempt getAttempt(TaskAttemptID attemptID);
+ Map<TaskAttemptId, TaskAttempt> getAttempts();
+ TaskAttempt getAttempt(TaskAttemptId attemptID);
/** Has Task reached the final state or not.
*/
@@ -52,7 +52,7 @@ public interface Task {
* @param taskAttemptID
* @return whether the attempt's output can be committed or not.
*/
- boolean canCommit(TaskAttemptID taskAttemptID);
+ boolean canCommit(TaskAttemptId taskAttemptID);
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Thu Mar 31 22:23:22 2011
@@ -20,19 +20,20 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.List;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
/**
* Read only view of TaskAttempt.
*/
public interface TaskAttempt {
- TaskAttemptID getID();
+ TaskAttemptId getID();
TaskAttemptReport getReport();
- List<CharSequence> getDiagnostics();
+ List<String> getDiagnostics();
Counters getCounters();
float getProgress();
TaskAttemptState getState();
@@ -43,7 +44,7 @@ public interface TaskAttempt {
/**If container Assigned then return container ID, otherwise null.
*/
- ContainerID getAssignedContainerID();
+ ContainerId getAssignedContainerID();
/**If container Assigned then return container mgr address, otherwise null.
*/
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobDiagnosticsUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobDiagnosticsUpdateEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobDiagnosticsUpdateEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobDiagnosticsUpdateEvent.java Thu Mar 31 22:23:22 2011
@@ -18,13 +18,14 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
public class JobDiagnosticsUpdateEvent extends JobEvent {
private String diagnosticUpdate;
- public JobDiagnosticsUpdateEvent(JobID jobID, String diagnostic) {
+ public JobDiagnosticsUpdateEvent(JobId jobID, String diagnostic) {
super(jobID, JobEventType.JOB_DIAGNOSTIC_UPDATE);
this.diagnosticUpdate = diagnostic;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEvent.java Thu Mar 31 22:23:22 2011
@@ -19,7 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
/**
* This class encapsulates job related events.
@@ -27,14 +27,14 @@ import org.apache.hadoop.mapreduce.v2.ap
*/
public class JobEvent extends AbstractEvent<JobEventType> {
- private JobID jobID;
+ private JobId jobID;
- public JobEvent(JobID jobID, JobEventType type) {
+ public JobEvent(JobId jobID, JobEventType type) {
super(type);
this.jobID = jobID;
}
- public JobID getJobId() {
+ public JobId getJobId() {
return jobID;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobFinishEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobFinishEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobFinishEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobFinishEvent.java Thu Mar 31 22:23:22 2011
@@ -18,8 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
public class JobFinishEvent
extends AbstractEvent<JobFinishEvent.Type> {
@@ -28,14 +28,14 @@ public class JobFinishEvent
STATE_CHANGED
}
- private JobID jobID;
+ private JobId jobID;
- public JobFinishEvent(JobID jobID) {
+ public JobFinishEvent(JobId jobID) {
super(Type.STATE_CHANGED);
this.jobID = jobID;
}
- public JobID getJobId() {
+ public JobId getJobId() {
return jobID;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobMapTaskRescheduledEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobMapTaskRescheduledEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobMapTaskRescheduledEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobMapTaskRescheduledEvent.java Thu Mar 31 22:23:22 2011
@@ -18,19 +18,20 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+
public class JobMapTaskRescheduledEvent extends JobEvent {
- private TaskID taskID;
+ private TaskId taskID;
- public JobMapTaskRescheduledEvent(TaskID taskID) {
- super(taskID.jobID, JobEventType.JOB_MAP_TASK_RESCHEDULED);
+ public JobMapTaskRescheduledEvent(TaskId taskID) {
+ super(taskID.getJobId(), JobEventType.JOB_MAP_TASK_RESCHEDULED);
this.taskID = taskID;
}
- public TaskID getTaskID() {
+ public TaskId getTaskID() {
return taskID;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptCompletedEvent.java Thu Mar 31 22:23:22 2011
@@ -18,7 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
public class JobTaskAttemptCompletedEvent extends JobEvent {
@@ -26,7 +26,7 @@ public class JobTaskAttemptCompletedEven
private TaskAttemptCompletionEvent completionEvent;
public JobTaskAttemptCompletedEvent(TaskAttemptCompletionEvent completionEvent) {
- super(completionEvent.attemptId.taskID.jobID,
+ super(completionEvent.getAttemptId().getTaskId().getJobId(),
JobEventType.JOB_TASK_ATTEMPT_COMPLETED_EVENT);
this.completionEvent = completionEvent;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskAttemptFetchFailureEvent.java Thu Mar 31 22:23:22 2011
@@ -20,27 +20,28 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.List;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
public class JobTaskAttemptFetchFailureEvent extends JobEvent {
- private final TaskAttemptID reduce;
- private final List<TaskAttemptID> maps;
+ private final TaskAttemptId reduce;
+ private final List<TaskAttemptId> maps;
- public JobTaskAttemptFetchFailureEvent(TaskAttemptID reduce,
- List<TaskAttemptID> maps) {
- super(reduce.taskID.jobID,
+ public JobTaskAttemptFetchFailureEvent(TaskAttemptId reduce,
+ List<TaskAttemptId> maps) {
+ super(reduce.getTaskId().getJobId(),
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE);
this.reduce = reduce;
this.maps = maps;
}
- public List<TaskAttemptID> getMaps() {
+ public List<TaskAttemptId> getMaps() {
return maps;
}
- public TaskAttemptID getReduce() {
+ public TaskAttemptId getReduce() {
return reduce;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobTaskEvent.java Thu Mar 31 22:23:22 2011
@@ -18,22 +18,22 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
public class JobTaskEvent extends JobEvent {
- private TaskID taskID;
+ private TaskId taskID;
private TaskState taskState;
- public JobTaskEvent(TaskID taskID, TaskState taskState) {
- super(taskID.jobID, JobEventType.JOB_TASK_COMPLETED);
+ public JobTaskEvent(TaskId taskID, TaskState taskState) {
+ super(taskID.getJobId(), JobEventType.JOB_TASK_COMPLETED);
this.taskID = taskID;
this.taskState = taskState;
}
- public TaskID getTaskID() {
+ public TaskId getTaskID() {
return taskID;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptContainerAssignedEvent.java Thu Mar 31 22:23:22 2011
@@ -18,18 +18,20 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
-import org.apache.hadoop.yarn.ContainerID;
-import org.apache.hadoop.yarn.ContainerToken;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+
+
public class TaskAttemptContainerAssignedEvent extends TaskAttemptEvent {
- private ContainerID containerID;
+ private ContainerId containerID;
private String containerManagerAddress;
private ContainerToken containerToken;
- public TaskAttemptContainerAssignedEvent(TaskAttemptID id,
- ContainerID containerID, String containerManagerAddress,
+ public TaskAttemptContainerAssignedEvent(TaskAttemptId id,
+ ContainerId containerID, String containerManagerAddress,
ContainerToken containerToken) {
super(id, TaskAttemptEventType.TA_ASSIGNED);
this.containerID = containerID;
@@ -37,7 +39,7 @@ public class TaskAttemptContainerAssigne
this.containerToken = containerToken;
}
- public ContainerID getContainerID() {
+ public ContainerId getContainerID() {
return this.containerID;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptDiagnosticsUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptDiagnosticsUpdateEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptDiagnosticsUpdateEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptDiagnosticsUpdateEvent.java Thu Mar 31 22:23:22 2011
@@ -18,13 +18,14 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
public class TaskAttemptDiagnosticsUpdateEvent extends TaskAttemptEvent {
private String diagnosticInfo;
- public TaskAttemptDiagnosticsUpdateEvent(TaskAttemptID attemptID,
+ public TaskAttemptDiagnosticsUpdateEvent(TaskAttemptId attemptID,
String diagnosticInfo) {
super(attemptID, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE);
this.diagnosticInfo = diagnosticInfo;
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEvent.java Thu Mar 31 22:23:22 2011
@@ -19,7 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
/**
* This class encapsulates task attempt related events.
@@ -27,14 +27,14 @@ import org.apache.hadoop.mapreduce.v2.ap
*/
public class TaskAttemptEvent extends AbstractEvent<TaskAttemptEventType> {
- private TaskAttemptID attemptID;
+ private TaskAttemptId attemptID;
- public TaskAttemptEvent(TaskAttemptID id, TaskAttemptEventType type) {
+ public TaskAttemptEvent(TaskAttemptId id, TaskAttemptEventType type) {
super(type);
this.attemptID = id;
}
- public TaskAttemptID getTaskAttemptID() {
+ public TaskAttemptId getTaskAttemptID() {
return attemptID;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java Thu Mar 31 22:23:22 2011
@@ -20,13 +20,16 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.List;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
private TaskAttemptStatus reportedTaskAttemptStatus;
- public TaskAttemptStatusUpdateEvent(TaskAttemptID id,
+ public TaskAttemptStatusUpdateEvent(TaskAttemptId id,
TaskAttemptStatus taskAttemptStatus) {
super(id, TaskAttemptEventType.TA_UPDATE);
this.reportedTaskAttemptStatus = taskAttemptStatus;
@@ -41,13 +44,13 @@ public class TaskAttemptStatusUpdateEven
*
*/
public static class TaskAttemptStatus {
- public org.apache.hadoop.mapreduce.v2.api.TaskAttemptID id;
+ public TaskAttemptId id;
public float progress;
- public org.apache.hadoop.mapreduce.v2.api.Counters counters;
- public java.lang.CharSequence diagnosticInfo;
- public java.lang.CharSequence stateString;
- public org.apache.hadoop.mapreduce.v2.api.Phase phase;
+ public Counters counters;
+ public String diagnosticInfo;
+ public String stateString;
+ public Phase phase;
public long outputSize;
- public List<TaskAttemptID> fetchFailedMaps;
+ public List<TaskAttemptId> fetchFailedMaps;
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskEvent.java Thu Mar 31 22:23:22 2011
@@ -19,7 +19,7 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
/**
* this class encapsulates task related events.
@@ -27,14 +27,14 @@ import org.apache.hadoop.mapreduce.v2.ap
*/
public class TaskEvent extends AbstractEvent<TaskEventType> {
- private TaskID taskID;
+ private TaskId taskID;
- public TaskEvent(TaskID taskID, TaskEventType type) {
+ public TaskEvent(TaskId taskID, TaskEventType type) {
super(type);
this.taskID = taskID;
}
- public TaskID getTaskID() {
+ public TaskId getTaskID() {
return taskID;
}
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptEvent.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptEvent.java Thu Mar 31 22:23:22 2011
@@ -18,18 +18,19 @@
package org.apache.hadoop.mapreduce.v2.app.job.event;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
public class TaskTAttemptEvent extends TaskEvent {
- private TaskAttemptID attemptID;
+ private TaskAttemptId attemptID;
- public TaskTAttemptEvent(TaskAttemptID id, TaskEventType type) {
- super(id.taskID, type);
+ public TaskTAttemptEvent(TaskAttemptId id, TaskEventType type) {
+ super(id.getTaskId(), type);
this.attemptID = id;
}
- public TaskAttemptID getTaskAttemptID() {
+ public TaskAttemptId getTaskAttemptID() {
return attemptID;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Mar 31 22:23:22 2011
@@ -60,6 +60,18 @@ import org.apache.hadoop.mapreduce.split
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.Counter;
+import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
@@ -79,27 +91,17 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.hadoop.yarn.ApplicationID;
-import org.apache.hadoop.mapreduce.v2.api.Counter;
-import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
-import org.apache.hadoop.mapreduce.v2.api.Counters;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.JobReport;
-import org.apache.hadoop.mapreduce.v2.api.JobState;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
-import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
-import org.apache.hadoop.mapreduce.v2.api.TaskID;
-import org.apache.hadoop.mapreduce.v2.api.TaskState;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
/** Implementation of Job interface. Maintains the state machines of Job.
* The read and write calls use ReadWriteLock for concurrency.
@@ -117,17 +119,19 @@ public class JobImpl implements org.apac
// Maximum no. of fetch-failure notifications after which map task is failed
private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+ private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
//final fields
private final Lock readLock;
private final Lock writeLock;
- private final JobID jobId;
+ private final JobId jobId;
private final org.apache.hadoop.mapreduce.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private boolean lazyTasksCopyNeeded = false;
private final Object tasksSyncHandle = new Object();
- private volatile Map<TaskID, Task> tasks = new LinkedHashMap<TaskID, Task>();
- private final Set<TaskID> mapTasks = new LinkedHashSet<TaskID>();
- private final Set<TaskID> reduceTasks = new LinkedHashSet<TaskID>();
+ private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
+ private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>();
+ private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
private final EventHandler eventHandler;
public Configuration conf;
@@ -144,10 +148,10 @@ public class JobImpl implements org.apac
private final List<String> diagnostics = new ArrayList<String>();
//task/attempt related datastructures
- private final Map<TaskID, Integer> successAttemptCompletionEventNoMap =
- new HashMap<TaskID, Integer>();
- private final Map<TaskAttemptID, Integer> fetchFailuresMapping =
- new HashMap<TaskAttemptID, Integer>();
+ private final Map<TaskId, Integer> successAttemptCompletionEventNoMap =
+ new HashMap<TaskId, Integer>();
+ private final Map<TaskAttemptId, Integer> fetchFailuresMapping =
+ new HashMap<TaskAttemptId, Integer>();
private static final TaskAttemptCompletedEventTransition TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new TaskAttemptCompletedEventTransition();
@@ -301,15 +305,15 @@ public class JobImpl implements org.apac
private Token<JobTokenIdentifier> jobToken;
private JobTokenSecretManager jobTokenSecretManager;
- public JobImpl(ApplicationID appID, Configuration conf,
+ public JobImpl(ApplicationId appID, Configuration conf,
EventHandler eventHandler, TaskAttemptListener taskAttemptListener,
JobTokenSecretManager jobTokenSecretManager,
Credentials fsTokenCredentials) {
- this.jobId = new JobID();
+ this.jobId = recordFactory.newRecordInstance(JobId.class);
this.conf = conf;
- this.jobId.appID = appID;
- this.jobId.id = appID.id;
+ jobId.setAppId(appID);
+ jobId.setId(appID.getId());
oldJobId = TypeConverter.fromYarn(jobId);
LOG.info("Job created" +
" appId=" + appID +
@@ -335,12 +339,12 @@ public class JobImpl implements org.apac
}
@Override
- public JobID getID() {
+ public JobId getID() {
return jobId;
}
@Override
- public Task getTask(TaskID taskID) {
+ public Task getTask(TaskId taskID) {
readLock.lock();
try {
return tasks.get(taskID);
@@ -383,8 +387,8 @@ public class JobImpl implements org.apac
}
public static Counters newCounters() {
- Counters counters = new Counters();
- counters.groups = new HashMap<CharSequence, CounterGroup>();
+ Counters counters = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Counters.class);
+// counters.groups = new HashMap<String, CounterGroup>();
return counters;
}
@@ -398,24 +402,24 @@ public class JobImpl implements org.apac
public static void incrAllCounters(Counters counters, Counters other) {
if (other != null) {
- for (CounterGroup otherGroup: other.groups.values()) {
- CounterGroup group = counters.groups.get(otherGroup.name);
+ for (CounterGroup otherGroup: other.getAllCounterGroups().values()) {
+ CounterGroup group = counters.getCounterGroup(otherGroup.getName());
if (group == null) {
- group = new CounterGroup();
- group.counters = new HashMap<CharSequence, Counter>();
- group.name = otherGroup.name;
- counters.groups.put(group.name, group);
+ group = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(CounterGroup.class);
+// group.counters = new HashMap<CharSequence, Counter>();
+ group.setName(otherGroup.getName());
+ counters.setCounterGroup(group.getName(), group);
}
- group.displayname = otherGroup.displayname;
- for (Counter otherCounter : otherGroup.counters.values()) {
- Counter counter = group.counters.get(otherCounter.name);
+ group.setDisplayName(otherGroup.getDisplayName());
+ for (Counter otherCounter : otherGroup.getAllCounters().values()) {
+ Counter counter = group.getCounter(otherCounter.getName());
if (counter == null) {
- counter = new Counter();
- counter.name = otherCounter.name;
- group.counters.put(counter.name, counter);
+ counter = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Counter.class);
+ counter.setName(otherCounter.getName());
+ group.setCounter(counter.getName(), counter);
}
- counter.displayName = otherCounter.displayName;
- counter.value += otherCounter.value;
+ counter.setDisplayName(otherCounter.getDisplayName());
+ counter.setValue(counter.getValue() + otherCounter.getValue());
}
}
}
@@ -453,32 +457,33 @@ public class JobImpl implements org.apac
public JobReport getReport() {
readLock.lock();
try {
- JobReport report = new JobReport();
- report.id = jobId;
- report.state = getState();
+ JobReport report = recordFactory.newRecordInstance(JobReport.class);
+ report.setJobId(jobId);
+ report.setJobState(getState());
// TODO - Fix to correctly setup report and to check state
- if (report.state == JobState.NEW) {
+ if (report.getJobState() == JobState.NEW) {
return report;
}
- report.startTime = startTime;
- report.finishTime = finishTime;
- report.setupProgress = setupProgress;
- report.cleanupProgress = cleanupProgress;
- report.mapProgress = computeProgress(mapTasks);
- report.reduceProgress = computeProgress(reduceTasks);
+ report.setStartTime(startTime);
+ report.setFinishTime(finishTime);
+ report.setSetupProgress(setupProgress);
+ report.setCleanupProgress(cleanupProgress);
+ report.setMapProgress(computeProgress(mapTasks));
+ report.setReduceProgress(computeProgress(reduceTasks));
+
return report;
} finally {
readLock.unlock();
}
}
- private float computeProgress(Set<TaskID> taskIds) {
+ private float computeProgress(Set<TaskId> taskIds) {
readLock.lock();
try {
float progress = 0;
- for (TaskID taskId : taskIds) {
+ for (TaskId taskId : taskIds) {
Task task = tasks.get(taskId);
progress += task.getProgress();
}
@@ -493,7 +498,7 @@ public class JobImpl implements org.apac
}
@Override
- public Map<TaskID, Task> getTasks() {
+ public Map<TaskId, Task> getTasks() {
synchronized (tasksSyncHandle) {
lazyTasksCopyNeeded = true;
return Collections.unmodifiableMap(tasks);
@@ -501,10 +506,10 @@ public class JobImpl implements org.apac
}
@Override
- public Map<TaskID,Task> getTasks(TaskType taskType) {
- Map<TaskID, Task> localTasksCopy = tasks;
- Map<TaskID, Task> result = new HashMap<TaskID, Task>();
- Set<TaskID> tasksOfGivenType = null;
+ public Map<TaskId,Task> getTasks(TaskType taskType) {
+ Map<TaskId, Task> localTasksCopy = tasks;
+ Map<TaskId, Task> result = new HashMap<TaskId, Task>();
+ Set<TaskId> tasksOfGivenType = null;
readLock.lock();
try {
if (TaskType.MAP == taskType) {
@@ -512,7 +517,7 @@ public class JobImpl implements org.apac
} else if (TaskType.REDUCE == taskType) {
tasksOfGivenType = reduceTasks;
}
- for (TaskID taskID : tasksOfGivenType)
+ for (TaskId taskID : tasksOfGivenType)
result.put(taskID, localTasksCopy.get(taskID));
return result;
} finally {
@@ -530,8 +535,8 @@ public class JobImpl implements org.apac
}
}
- protected void scheduleTasks(Set<TaskID> taskIDs) {
- for (TaskID taskID : taskIDs) {
+ protected void scheduleTasks(Set<TaskId> taskIDs) {
+ for (TaskId taskID : taskIDs) {
eventHandler.handle(new TaskEvent(taskID,
TaskEventType.T_SCHEDULE));
}
@@ -571,7 +576,7 @@ public class JobImpl implements org.apac
protected void addTask(Task task) {
synchronized (tasksSyncHandle) {
if (lazyTasksCopyNeeded) {
- Map<TaskID, Task> newTasks = new LinkedHashMap<TaskID, Task>();
+ Map<TaskId, Task> newTasks = new LinkedHashMap<TaskId, Task>();
newTasks.putAll(tasks);
tasks = newTasks;
lazyTasksCopyNeeded = false;
@@ -591,7 +596,7 @@ public class JobImpl implements org.apac
}
@Override
- public CharSequence getName() {
+ public String getName() {
return "FIXME! job name";
}
@@ -616,11 +621,13 @@ public class JobImpl implements org.apac
job.oldJobId);
job.fs = FileSystem.get(job.conf);
- org.apache.hadoop.mapreduce.v2.api.TaskAttemptID
- attemptID = new org.apache.hadoop.mapreduce.v2.api.TaskAttemptID();
- attemptID.taskID = new TaskID();
- attemptID.taskID.jobID = job.jobId;
- attemptID.taskID.taskType = TaskType.MAP; //TODO:fix task type ??
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId
+ attemptID = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId.class);
+ //TODO_get.set
+ attemptID.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
+ attemptID.getTaskId().setJobId(job.jobId);
+ attemptID.getTaskId().setTaskType(TaskType.MAP);//TODO:fix task type ??
+
TaskAttemptContext taskContext =
new TaskAttemptContextImpl(job.conf,
TypeConverter.fromYarn(attemptID));
@@ -771,7 +778,7 @@ public class JobImpl implements org.apac
jobId + " = " + job.numReduceTasks);
}
- protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobID jobId) {
+ protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
TaskSplitMetaInfo[] allTaskSplitMetaInfo;
try {
allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
@@ -845,21 +852,21 @@ public class JobImpl implements org.apac
((JobTaskAttemptCompletedEvent) event).getCompletionEvent();
// Add the TaskAttemptCompletionEvent
//eventId is equal to index in the arraylist
- tce.eventId = job.taskAttemptCompletionEvents.size();
+ tce.setEventId(job.taskAttemptCompletionEvents.size());
job.taskAttemptCompletionEvents.add(tce);
//make the previous completion event as obsolete if it exists
Object successEventNo =
- job.successAttemptCompletionEventNoMap.remove(tce.attemptId.taskID);
+ job.successAttemptCompletionEventNoMap.remove(tce.getAttemptId().getTaskId());
if (successEventNo != null) {
TaskAttemptCompletionEvent successEvent =
job.taskAttemptCompletionEvents.get((Integer) successEventNo);
- successEvent.status = TaskAttemptCompletionEventStatus.OBSOLETE;
+ successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
}
- if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.status)) {
- job.successAttemptCompletionEventNoMap.put(tce.attemptId.taskID,
- tce.eventId);
+ if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
+ job.successAttemptCompletionEventNoMap.put(tce.getAttemptId().getTaskId(),
+ tce.getEventId());
}
}
}
@@ -870,7 +877,7 @@ public class JobImpl implements org.apac
public void transition(JobImpl job, JobEvent event) {
JobTaskAttemptFetchFailureEvent fetchfailureEvent =
(JobTaskAttemptFetchFailureEvent) event;
- for (org.apache.hadoop.mapreduce.v2.api.TaskAttemptID mapId :
+ for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId :
fetchfailureEvent.getMaps()) {
Integer fetchFailures = job.fetchFailuresMapping.get(mapId);
fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
@@ -878,7 +885,7 @@ public class JobImpl implements org.apac
//get number of running reduces
int runningReduceTasks = 0;
- for (TaskID taskId : job.reduceTasks) {
+ for (TaskId taskId : job.reduceTasks) {
if (TaskState.RUNNING.equals(job.tasks.get(taskId).getState())) {
runningReduceTasks++;
}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Thu Mar 31 22:23:22 2011
@@ -28,18 +28,19 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
public class MapTaskImpl extends TaskImpl {
private final TaskSplitMetaInfo taskSplitMetaInfo;
- public MapTaskImpl(JobID jobId, int partition, EventHandler eventHandler,
+ public MapTaskImpl(JobId jobId, int partition, EventHandler eventHandler,
Path remoteJobConfFile, Configuration conf,
TaskSplitMetaInfo taskSplitMetaInfo,
TaskAttemptListener taskAttemptListener, OutputCommitter committer,
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1087462&r1=1087461&r2=1087462&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Thu Mar 31 22:23:22 2011
@@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl;
import java.util.Collection;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -27,18 +26,18 @@ import org.apache.hadoop.mapred.ReduceTa
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.mapreduce.v2.api.JobID;
-import org.apache.hadoop.mapreduce.v2.api.TaskType;
public class ReduceTaskImpl extends TaskImpl {
private final int numMapTasks;
- public ReduceTaskImpl(JobID jobId, int partition,
+ public ReduceTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path jobFile, Configuration conf,
int numMapTasks, TaskAttemptListener taskAttemptListener,
OutputCommitter committer, Token<JobTokenIdentifier> jobToken,