You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2011/07/26 08:40:28 UTC
svn commit: r1150993 - in /hadoop/common/branches/MR-279/mapreduce: ./
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
mr-client/h...
Author: sharad
Date: Tue Jul 26 06:40:26 2011
New Revision: 1150993
URL: http://svn.apache.org/viewvc?rev=1150993&view=rev
Log:
MAPREDUCE-2664. Implement JobCounters for MRv2. Contributed by Siddharth Seth.
Added:
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java
Modified:
hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
Modified: hadoop/common/branches/MR-279/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/CHANGES.txt?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/CHANGES.txt (original)
+++ hadoop/common/branches/MR-279/mapreduce/CHANGES.txt Tue Jul 26 06:40:26 2011
@@ -5,6 +5,9 @@ Trunk (unreleased changes)
MAPREDUCE-279
+ MAPREDUCE-2664. Implement JobCounters for MRv2. (Siddharth Seth via
+ sharad)
+
MAPREDUCE-2667. mapred job -kill leaves application in RUNNING state
(thomas graves via mahadev)
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java Tue Jul 26 06:40:26 2011
@@ -36,12 +36,14 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.AMConstants;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -205,9 +207,23 @@ public class LocalContainerLauncher exte
}
try {
+ if (remoteTask.isMapOrReduce()) {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1);
+ if (remoteTask.isMapTask()) {
+ jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1);
+ } else {
+ jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1);
+ }
+ context.getEventHandler().handle(jce);
+ }
runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
(numReduceTasks > 0));
+
} catch (RuntimeException re) {
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
+ context.getEventHandler().handle(jce);
// this is our signal that the subtask failed in some way, so
// simulate a failed JVM/container and send a container-completed
// event to task attempt (i.e., move state machine from RUNNING
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Jul 26 06:40:26 2011
@@ -36,6 +36,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -462,6 +464,7 @@ public class JobHistoryEventHandler exte
.toString());
// TODO JOB_FINISHED does not have state. Effectively job history does not
// have state about the finished job.
+ setSummarySlotSeconds(summary, jobId);
break;
case JOB_FAILED:
case JOB_KILLED:
@@ -470,10 +473,26 @@ public class JobHistoryEventHandler exte
summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
summary.setJobFinishTime(juce.getFinishTime());
+ setSummarySlotSeconds(summary, jobId);
break;
}
}
+ private void setSummarySlotSeconds(JobSummary summary, JobId jobId) {
+ Counter slotMillisMapCounter =
+ context.getJob(jobId).getCounters()
+ .getCounter(JobCounter.SLOTS_MILLIS_MAPS);
+ if (slotMillisMapCounter != null) {
+ summary.setMapSlotSeconds(slotMillisMapCounter.getValue());
+ }
+ Counter slotMillisReduceCounter =
+ context.getJob(jobId).getCounters()
+ .getCounter(JobCounter.SLOTS_MILLIS_REDUCES);
+ if (slotMillisReduceCounter != null) {
+ summary.setMapSlotSeconds(slotMillisReduceCounter.getValue());
+ }
+ }
+
protected void closeEventWriter(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
Added: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java?rev=1150993&view=auto
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java (added)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobCounterUpdateEvent.java Tue Jul 26 06:40:26 2011
@@ -0,0 +1,42 @@
+package org.apache.hadoop.mapreduce.v2.app.job.event;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+
+public class JobCounterUpdateEvent extends JobEvent {
+
+ List<CounterIncrementalUpdate> counterUpdates = null;
+
+ public JobCounterUpdateEvent(JobId jobId) {
+ super(jobId, JobEventType.JOB_COUNTER_UPDATE);
+ counterUpdates = new ArrayList<JobCounterUpdateEvent.CounterIncrementalUpdate>();
+ }
+
+ public void addCounterUpdate(Enum<?> key, long incrValue) {
+ counterUpdates.add(new CounterIncrementalUpdate(key, incrValue));
+ }
+
+ public List<CounterIncrementalUpdate> getCounterUpdates() {
+ return counterUpdates;
+ }
+
+ public static class CounterIncrementalUpdate {
+ Enum<?> key;
+ long incrValue;
+
+ public CounterIncrementalUpdate(Enum<?> key, long incrValue) {
+ this.key = key;
+ this.incrValue = incrValue;
+ }
+
+ public Enum<?> getCounterKey() {
+ return key;
+ }
+
+ public long getIncrementValue() {
+ return incrValue;
+ }
+ }
+}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Tue Jul 26 06:40:26 2011
@@ -41,6 +41,7 @@ public enum JobEventType {
//Producer:Any component
JOB_DIAGNOSTIC_UPDATE,
INTERNAL_ERROR,
+ JOB_COUNTER_UPDATE,
//Producer:TaskAttemptListener
JOB_TASK_ATTEMPT_FETCH_FAILURE
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Jul 26 06:40:26 2011
@@ -79,6 +79,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -149,12 +150,8 @@ public class JobImpl implements org.apac
private boolean lazyTasksCopyNeeded = false;
private volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
private Counters jobCounters = newCounters();
- // FIXME: support job-level counters
+ // FIXME:
//
- // Presumably want to define new event type that job-related entities
- // (e.g., MRAppMaster or LocalContainerLauncher) can emit with some sort
- // of payload (maybe just Counters?); then define new Job state-machine
- // transition to handle the event and update jobCounters with payload data.
// Can then replace task-level uber counters (MR-2424) with job-level ones
// sent from LocalContainerLauncher, and eventually including a count of
// of uber-AM attempts (probably sent from MRAppMaster).
@@ -184,6 +181,8 @@ public class JobImpl implements org.apac
private static final TaskAttemptCompletedEventTransition
TASK_ATTEMPT_COMPLETED_EVENT_TRANSITION =
new TaskAttemptCompletedEventTransition();
+ private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
+ new CounterUpdateTransition();
protected static final
StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent>
@@ -195,6 +194,8 @@ public class JobImpl implements org.apac
.addTransition(JobState.NEW, JobState.NEW,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(JobState.NEW, JobState.NEW,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
(JobState.NEW,
EnumSet.of(JobState.INITED, JobState.FAILED),
@@ -211,6 +212,8 @@ public class JobImpl implements org.apac
.addTransition(JobState.INITED, JobState.INITED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(JobState.INITED, JobState.INITED,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(JobState.INITED, JobState.RUNNING,
JobEventType.JOB_START,
new StartTransition())
@@ -244,6 +247,8 @@ public class JobImpl implements org.apac
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobState.RUNNING, JobState.RUNNING,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ .addTransition(JobState.RUNNING, JobState.RUNNING,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
new TaskAttemptFetchFailureTransition())
.addTransition(
@@ -263,6 +268,8 @@ public class JobImpl implements org.apac
.addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
JobState.KILL_WAIT,
JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -277,6 +284,8 @@ public class JobImpl implements org.apac
.addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
JobState.SUCCEEDED,
JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -290,6 +299,8 @@ public class JobImpl implements org.apac
.addTransition(JobState.FAILED, JobState.FAILED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(JobState.FAILED, JobState.FAILED,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
JobState.FAILED,
JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -303,6 +314,8 @@ public class JobImpl implements org.apac
.addTransition(JobState.KILLED, JobState.KILLED,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(JobState.KILLED, JobState.KILLED,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition(
JobState.KILLED,
JobState.ERROR, JobEventType.INTERNAL_ERROR,
@@ -460,7 +473,6 @@ public class JobImpl implements org.apac
@Override
public Counters getCounters() {
Counters counters = newCounters();
- // TODO: compute job-level counters
readLock.lock();
try {
incrAllCounters(counters, jobCounters);
@@ -500,7 +512,6 @@ public class JobImpl implements org.apac
public static Counters newCounters() {
Counters counters = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(Counters.class);
-// counters.groups = new HashMap<String, CounterGroup>();
return counters;
}
@@ -519,7 +530,6 @@ public class JobImpl implements org.apac
if (group == null) {
group = RecordFactoryProvider.getRecordFactory(null)
.newRecordInstance(CounterGroup.class);
-// group.counters = new HashMap<CharSequence, Counter>();
group.setName(otherGroup.getName());
counters.setCounterGroup(group.getName(), group);
}
@@ -1363,7 +1373,7 @@ public class JobImpl implements org.apac
private void addDiagnostic(String diag) {
diagnostics.add(diag);
}
-
+
private static class DiagnosticsUpdateTransition implements
SingleArcTransition<JobImpl, JobEvent> {
@Override
@@ -1372,6 +1382,18 @@ public class JobImpl implements org.apac
.getDiagnosticUpdate());
}
}
+
+ private static class CounterUpdateTransition implements
+ SingleArcTransition<JobImpl, JobEvent> {
+ @Override
+ public void transition(JobImpl job, JobEvent event) {
+ JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event;
+ for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce
+ .getCounterUpdates()) {
+ job.jobCounters.incrCounter(ci.getCounterKey(), ci.getIncrementValue());
+ }
+ }
+ }
private static class InternalErrorTransition implements
SingleArcTransition<JobImpl, JobEvent> {
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Jul 26 06:40:26 2011
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
+import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -73,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -131,6 +133,8 @@ public abstract class TaskAttemptImpl im
private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
+ private static final int MAP_MEMORY_MB_DEFAULT = 1024;
+ private static final int REDUCE_MEMORY_MB_DEFAULT = 1024;
private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
protected final Configuration conf;
@@ -470,9 +474,9 @@ public abstract class TaskAttemptImpl im
private int getMemoryRequired(Configuration conf, TaskType taskType) {
int memory = 1024;
if (taskType == TaskType.MAP) {
- memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, 1024);
+ memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, MAP_MEMORY_MB_DEFAULT);
} else if (taskType == TaskType.REDUCE) {
- memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
+ memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, REDUCE_MEMORY_MB_DEFAULT);
}
return memory;
@@ -903,6 +907,42 @@ public abstract class TaskAttemptImpl im
finishTime = clock.getTime();
}
}
+
+ private static long computeSlotMillis(TaskAttemptImpl taskAttempt) {
+ TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
+ int slotMemoryReq =
+ taskAttempt.getMemoryRequired(taskAttempt.conf, taskType);
+ int simSlotsRequired =
+ slotMemoryReq
+ / (taskType == TaskType.MAP ? MAP_MEMORY_MB_DEFAULT
+ : REDUCE_MEMORY_MB_DEFAULT);
+ // Simulating MRv1 slots for counters by assuming *_MEMORY_MB_DEFAULT
+ // corresponds to a MrV1 slot.
+ // Fallow slot millis is not applicable in MRv2 - since a container is
+ // either assigned with the required memory or is not. No partial
+ // reserveations
+ long slotMillisIncrement =
+ simSlotsRequired
+ * (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
+ return slotMillisIncrement;
+ }
+
+ private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
+ TaskAttemptImpl taskAttempt) {
+ TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
+ JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
+
+ long slotMillisIncrement = computeSlotMillis(taskAttempt);
+
+ if (taskType == TaskType.MAP) {
+ jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
+ jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+ } else {
+ jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
+ jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+ }
+ return jce;
+ }
private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) {
@@ -1080,8 +1120,11 @@ public abstract class TaskAttemptImpl im
break;
}
if (taskAttempt.getLaunchTime() != 0) {
- TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
- taskAttempt, finalState);
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+ finalState);
+ taskAttempt.eventHandler
+ .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
} else {
@@ -1106,6 +1149,15 @@ public abstract class TaskAttemptImpl im
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
// Costly?
+ JobCounterUpdateEvent jce =
+ new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
+ .getJobId());
+ jce.addCounterUpdate(
+ taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ?
+ JobCounter.TOTAL_LAUNCHED_MAPS: JobCounter.TOTAL_LAUNCHED_REDUCES
+ , 1);
+ taskAttempt.eventHandler.handle(jce);
+
TaskAttemptStartedEvent tase =
new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
@@ -1163,24 +1215,22 @@ public abstract class TaskAttemptImpl im
String taskType =
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString();
LOG.info("In TaskAttemptImpl taskType: " + taskType);
+ long slotMillis = computeSlotMillis(taskAttempt);
+ JobCounterUpdateEvent jce =
+ new JobCounterUpdateEvent(taskAttempt.attemptId.getTaskId()
+ .getJobId());
+ jce.addCounterUpdate(
+ taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP ?
+ JobCounter.SLOTS_MILLIS_MAPS : JobCounter.SLOTS_MILLIS_REDUCES,
+ slotMillis);
+ taskAttempt.eventHandler.handle(jce);
taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED);
- /*
- TaskAttemptFinishedEvent tfe =
- new TaskAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
- TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
- TaskAttemptState.SUCCEEDED.toString(),
- taskAttempt.reportedStatus.finishTime, "hostname",
- TaskAttemptState.SUCCEEDED.toString(),
- TypeConverter.fromYarn(taskAttempt.getCounters()));
- taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tfe));
- */
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_SUCCEEDED));
taskAttempt.eventHandler.handle
(new SpeculatorEvent
(taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
-
}
}
@@ -1190,9 +1240,13 @@ public abstract class TaskAttemptImpl im
public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
// set the finish time
taskAttempt.setFinishTime();
+
if (taskAttempt.getLaunchTime() != 0) {
- TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
- taskAttempt, TaskAttemptState.FAILED);
+ taskAttempt.eventHandler
+ .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+ TaskAttemptState.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not
@@ -1245,9 +1299,13 @@ public abstract class TaskAttemptImpl im
taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
//set the finish time
taskAttempt.setFinishTime();
+
if (taskAttempt.getLaunchTime() != 0) {
- TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
- taskAttempt, TaskAttemptState.FAILED);
+ taskAttempt.eventHandler
+ .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+ TaskAttemptState.FAILED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
@@ -1268,8 +1326,11 @@ public abstract class TaskAttemptImpl im
//set the finish time
taskAttempt.setFinishTime();
if (taskAttempt.getLaunchTime() != 0) {
- TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
- taskAttempt, TaskAttemptState.KILLED);
+ taskAttempt.eventHandler
+ .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+ TaskAttemptState.KILLED);
taskAttempt.eventHandler.handle(new JobHistoryEvent(
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Jul 26 06:40:26 2011
@@ -254,7 +254,7 @@ public abstract class TaskImpl implement
this.jobToken = jobToken;
this.metrics = metrics;
- if (completedTasksFromPreviousRun != null
+ if (completedTasksFromPreviousRun != null
&& completedTasksFromPreviousRun.contains(taskId)) {
LOG.info("Task is from previous run " + taskId);
startCount = startCount - 1;
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Tue Jul 26 06:40:26 2011
@@ -22,11 +22,15 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -74,6 +78,15 @@ public class LocalContainerAllocator ext
container.setContainerToken(null);
container.setNodeHttpAddress("localhost:9999");
// send the container-assigned event to task attempt
+
+ if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) {
+ JobCounterUpdateEvent jce =
+ new JobCounterUpdateEvent(event.getAttemptID().getTaskId()
+ .getJobId());
+ // TODO Setting OTHER_LOCAL_MAP for now.
+ jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
+ }
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
event.getAttemptID(), container));
}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java Tue Jul 26 06:40:26 2011
@@ -46,6 +46,7 @@ public class ContainerRequestEvent exten
public static ContainerRequestEvent createContainerRequestEventForFailedContainer(
TaskAttemptId attemptID,
Resource capability) {
+ //ContainerRequest for failed events does not consider rack / node locality?
return new ContainerRequestEvent(attemptID, capability);
}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Jul 26 06:40:26 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -34,12 +36,14 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AMConstants;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -456,6 +460,7 @@ public class RMContainerAllocator extend
private final LinkedList<TaskAttemptId> earlierFailedMaps =
new LinkedList<TaskAttemptId>();
+ /** Maps from a host to a list of Map tasks with data on the host */
private final Map<String, LinkedList<TaskAttemptId>> mapsHostMapping =
new HashMap<String, LinkedList<TaskAttemptId>>();
private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
@@ -501,6 +506,18 @@ public class RMContainerAllocator extend
request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
} else {
for (String host : event.getHosts()) {
+ //host comes from data splitLocations which are hostnames. Containers
+ // use IP addresses.
+ //TODO Temporary fix for locality. Use resolvers from h-common.
+ // Cache to make this more efficient ?
+ InetAddress addr = null;
+ try {
+ addr = InetAddress.getByName(host);
+ } catch (UnknownHostException e) {
+ LOG.warn("Unable to resolve host to IP for host [: " + host + "]");
+ }
+ if (addr != null) //Fallback to host if resolve fails.
+ host = addr.getHostAddress();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
if (list == null) {
list = new LinkedList<TaskAttemptId>();
@@ -585,6 +602,7 @@ public class RMContainerAllocator extend
//try to assign to earlierFailedMaps if present
assigned = assignToFailedMap(allocated);
+ //Assign to reduces before assigning to maps ?
if (assigned == null) {
assigned = assignToReduce(allocated);
}
@@ -606,6 +624,10 @@ public class RMContainerAllocator extend
TaskAttemptId tId = earlierFailedMaps.removeFirst();
if (maps.containsKey(tId)) {
assigned = maps.remove(tId);
+ JobCounterUpdateEvent jce =
+ new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
LOG.info("Assigned from earlierFailedMaps");
break;
}
@@ -638,6 +660,10 @@ public class RMContainerAllocator extend
TaskAttemptId tId = list.removeFirst();
if (maps.containsKey(tId)) {
assigned = maps.remove(tId);
+ JobCounterUpdateEvent jce =
+ new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
hostLocalAssigned++;
LOG.info("Assigned based on host match " + host);
break;
@@ -650,6 +676,10 @@ public class RMContainerAllocator extend
TaskAttemptId tId = list.removeFirst();
if (maps.containsKey(tId)) {
assigned = maps.remove(tId);
+ JobCounterUpdateEvent jce =
+ new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
rackLocalAssigned++;
LOG.info("Assigned based on rack match " + rack);
break;
@@ -658,6 +688,10 @@ public class RMContainerAllocator extend
if (assigned == null && maps.size() > 0) {
TaskAttemptId tId = maps.keySet().iterator().next();
assigned = maps.remove(tId);
+ JobCounterUpdateEvent jce =
+ new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
LOG.info("Assigned based on * match");
break;
}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Tue Jul 26 06:40:26 2011
@@ -469,7 +469,9 @@ public class TestRMContainerAllocator {
return new EventHandler() {
@Override
public void handle(Event event) {
- events.add((TaskAttemptContainerAssignedEvent) event);
+ if (event instanceof TaskAttemptContainerAssignedEvent) {
+ events.add((TaskAttemptContainerAssignedEvent) event);
+ } //Ignoring JobCounterUpdateEvents
}
};
}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Counters.java Tue Jul 26 06:40:26 2011
@@ -11,4 +11,6 @@ public interface Counters {
public abstract void setCounterGroup(String key, CounterGroup value);
public abstract void removeCounterGroup(String key);
public abstract void clearCounterGroups();
+
+ public abstract void incrCounter(Enum<?> key, long amount);
}
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/CountersPBImpl.java Tue Jul 26 06:40:26 2011
@@ -78,6 +78,26 @@ public class CountersPBImpl extends Prot
CounterGroup group = getCounterGroup(key.getDeclaringClass().getName());
return group == null ? null : group.getCounter(key.name());
}
+
+ @Override
+ public void incrCounter(Enum<?> key, long amount) {
+ String groupName = key.getDeclaringClass().getName();
+ if (getCounterGroup(groupName) == null) {
+ CounterGroup cGrp = new CounterGroupPBImpl();
+ cGrp.setName(groupName);
+ cGrp.setDisplayName(groupName);
+ setCounterGroup(groupName, cGrp);
+ }
+ if (getCounterGroup(groupName).getCounter(key.name()) == null) {
+ Counter c = new CounterPBImpl();
+ c.setName(key.name());
+ c.setDisplayName(key.name());
+ c.setValue(0l);
+ getCounterGroup(groupName).setCounter(key.name(), c);
+ }
+ Counter counter = getCounterGroup(groupName).getCounter(key.name());
+ counter.setValue(counter.getValue() + amount);
+ }
private void initCounterGroups() {
if (this.counterGroups != null) {
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Tue Jul 26 06:40:26 2011
@@ -48,7 +48,9 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.Mapper;
@@ -124,7 +126,6 @@ public class TestMRJobs {
@Test
public void testSleepJob() throws IOException, InterruptedException,
ClassNotFoundException {
-
LOG.info("\n\n\nStarting testSleepJob().");
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
@@ -148,17 +149,35 @@ public class TestMRJobs {
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
-
+ verifySleepJobCounters(job);
+
+
// TODO later: add explicit "isUber()" checks of some sort (extend
// JobStatus?)--compare against MRJobConfig.JOB_UBERTASK_ENABLE value
}
+ protected void verifySleepJobCounters(Job job) throws InterruptedException,
+ IOException {
+ Counters counters = job.getCounters();
+ Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+ .getValue());
+ Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+ .getValue());
+ Assert.assertEquals(2,
+ counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
+ Assert
+ .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+ && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+ Assert
+ .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+ && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+ }
+
@Test
public void testRandomWriter() throws IOException, InterruptedException,
ClassNotFoundException {
-
+
LOG.info("\n\n\nStarting testRandomWriter().");
-
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");
@@ -169,8 +188,8 @@ public class TestMRJobs {
mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
Job job = randomWriterJob.createJob(mrCluster.getConfig());
- Path outputDir = new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
- "random-output");
+ Path outputDir =
+ new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output");
FileOutputFormat.setOutputPath(job, outputDir);
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(RandomTextWriterJob.class);
@@ -179,6 +198,7 @@ public class TestMRJobs {
Assert.assertTrue(succeeded);
Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState());
// Make sure there are three files in the output-dir
+
RemoteIterator<FileStatus> iterator =
FileContext.getFileContext(mrCluster.getConfig()).listStatus(
outputDir);
@@ -191,9 +211,22 @@ public class TestMRJobs {
}
}
Assert.assertEquals("Number of part files is wrong!", 3, count);
+ verifyRandomWriterCounters(job);
// TODO later: add explicit "isUber()" checks of some sort
}
+
+ protected void verifyRandomWriterCounters(Job job)
+ throws InterruptedException, IOException {
+ Counters counters = job.getCounters();
+ Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+ .getValue());
+ Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+ .getValue());
+ Assert
+ .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+ && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+ }
@Test
public void testFailingMapper() throws IOException, InterruptedException,
@@ -227,9 +260,24 @@ public class TestMRJobs {
Assert.assertEquals(TaskCompletionEvent.Status.FAILED,
events[1].getStatus().FAILED);
Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+ verifyFailingMapperCounters(job);
// TODO later: add explicit "isUber()" checks of some sort
}
+
+ protected void verifyFailingMapperCounters(Job job)
+ throws InterruptedException, IOException {
+ Counters counters = job.getCounters();
+ Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+ .getValue());
+ Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+ .getValue());
+ Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+ .getValue());
+ Assert
+ .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+ && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+ }
protected Job runFailingMapperJob()
throws IOException, InterruptedException, ClassNotFoundException {
Modified: hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java?rev=1150993&r1=1150992&r2=1150993&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java (original)
+++ hadoop/common/branches/MR-279/mapreduce/mr-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestUberAM.java Tue Jul 26 06:40:26 2011
@@ -25,7 +25,9 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -55,6 +57,32 @@ public class TestUberAM extends TestMRJo
}
super.testSleepJob();
}
+
+ @Override
+ protected void verifySleepJobCounters(Job job) throws InterruptedException,
+ IOException {
+ Counters counters = job.getCounters();
+
+ Assert.assertEquals(3, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+ .getValue());
+ Assert.assertEquals(3, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+ .getValue());
+ Assert.assertEquals(1,
+ counters.findCounter(JobCounter.TOTAL_LAUNCHED_REDUCES).getValue());
+ Assert
+ .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+ && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+ Assert
+ .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+ && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+
+ Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
+ .getValue());
+ Assert.assertEquals(1, counters.findCounter(JobCounter.NUM_UBER_SUBREDUCES)
+ .getValue());
+ Assert.assertEquals(4,
+ counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
+ }
@Override
public void testRandomWriter()
@@ -63,6 +91,17 @@ public class TestUberAM extends TestMRJo
}
@Override
+ protected void verifyRandomWriterCounters(Job job)
+ throws InterruptedException, IOException {
+ super.verifyRandomWriterCounters(job);
+ Counters counters = job.getCounters();
+ Assert.assertEquals(3, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
+ .getValue());
+ Assert.assertEquals(3,
+ counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
+ }
+
+ @Override
public void testFailingMapper()
throws IOException, InterruptedException, ClassNotFoundException {
LOG.info("\n\n\nStarting uberized testFailingMapper().");
@@ -100,9 +139,34 @@ public class TestUberAM extends TestMRJo
Assert.assertEquals(TaskCompletionEvent.Status.FAILED,
events[0].getStatus().FAILED);
Assert.assertEquals(JobStatus.State.FAILED, job.getJobState());
+
+ //Disabling till UberAM honors MRJobConfig.MAP_MAX_ATTEMPTS
+ //verifyFailingMapperCounters(job);
// TODO later: add explicit "isUber()" checks of some sort
}
+
+ @Override
+ protected void verifyFailingMapperCounters(Job job)
+ throws InterruptedException, IOException {
+ Counters counters = job.getCounters();
+ Assert.assertEquals(2, counters.findCounter(JobCounter.OTHER_LOCAL_MAPS)
+ .getValue());
+ Assert.assertEquals(2, counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS)
+ .getValue());
+ Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_FAILED_MAPS)
+ .getValue());
+ Assert
+ .assertTrue(counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS) != null
+ && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
+
+ Assert.assertEquals(2,
+ counters.findCounter(JobCounter.TOTAL_LAUNCHED_UBERTASKS).getValue());
+ Assert.assertEquals(2, counters.findCounter(JobCounter.NUM_UBER_SUBMAPS)
+ .getValue());
+ Assert.assertEquals(2, counters
+ .findCounter(JobCounter.NUM_FAILED_UBERTASKS).getValue());
+ }
//@Test //FIXME: if/when the corresponding TestMRJobs test gets enabled, do so here as well (potentially with mods for ubermode)
public void testSleepJobWithSecurityOn()