You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2013/01/29 22:35:27 UTC
svn commit: r1440155 - in
/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduc...
Author: suresh
Date: Tue Jan 29 21:35:24 2013
New Revision: 1440155
URL: http://svn.apache.org/viewvc?rev=1440155&view=rev
Log:
Merging trunk to branch-trunk-win
Added:
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Avataar.java
- copied unchanged from r1440147, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Avataar.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Locality.java
- copied unchanged from r1440147, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Locality.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
- copied unchanged from r1440147, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
Removed:
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
Modified:
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (contents, props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/ (props changed)
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (props changed)
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1438241-1440147
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt Tue Jan 29 21:35:24 2013
@@ -11,16 +11,9 @@ Trunk (Unreleased)
MAPREDUCE-2669. Add new examples for Mean, Median, and Standard Deviation.
(Plamen Jeliazkov via shv)
- MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
- (Avner BenHanoch via acmurthy)
-
- MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
-
MAPREDUCE-4887. Add RehashPartitioner, to smooth distributions
with poor implementations of Object#hashCode(). (Radim Kolar via cutting)
- MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse by Shuffle implementations. (masokan via tucu)
-
IMPROVEMENTS
MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
@@ -78,9 +71,6 @@ Trunk (Unreleased)
MAPREDUCE-4735. Make arguments in TestDFSIO case insensitive.
(Brandon Li via suresh)
- MAPREDUCE-4809. Change visibility of classes for pluggable sort changes.
- (masokan via tucu)
-
BUG FIXES
MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
@@ -180,6 +170,14 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4810. Added new admin command options for MR AM. (Jerry Chen via
vinodkv)
+ MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
+ (Avner BenHanoch via acmurthy)
+
+ MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
+
+ MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse
+ by Shuffle implementations. (masokan via tucu)
+
IMPROVEMENTS
MAPREDUCE-3678. The Map tasks logs should have the value of input
@@ -211,6 +209,12 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4949. Enable multiple pi jobs to run in parallel. (sandyr via tucu)
+ MAPREDUCE-4809. Change visibility of classes for pluggable sort changes.
+ (masokan via tucu)
+
+ MAPREDUCE-4838. Add additional fields like Locality, Avataar to the
+ JobHistory logs. (Zhijie Shen via sseth)
+
OPTIMIZATIONS
BUG FIXES
@@ -268,6 +272,12 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4948. Fix a failing unit test TestYARNRunner.testHistoryServerToken.
(Junping Du via sseth)
+ MAPREDUCE-4803. Remove duplicate copy of TestIndexCache. (Mariappan Asokan
+ via sseth)
+
+ MAPREDUCE-2264. Job status exceeds 100% in some cases.
+ (devaraj.k and sandyr via tucu)
+
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1438241-1440147
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1438241-1440147
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Jan 29 21:35:24 2013
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -1192,6 +1193,39 @@ public class JobImpl implements org.apac
}
}
*/
+ /**
+ * Get the workflow adjacencies from the job conf
+ * The string returned is of the form "key"="value" "key"="value" ...
+ */
+ private static String getWorkflowAdjacencies(Configuration conf) {
+ int prefixLen = MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING.length();
+ Map<String,String> adjacencies =
+ conf.getValByRegex(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN);
+ if (adjacencies.isEmpty()) {
+ return "";
+ }
+ int size = 0;
+ for (Entry<String,String> entry : adjacencies.entrySet()) {
+ int keyLen = entry.getKey().length();
+ size += keyLen - prefixLen;
+ size += entry.getValue().length() + 6;
+ }
+ StringBuilder sb = new StringBuilder(size);
+ for (Entry<String,String> entry : adjacencies.entrySet()) {
+ int keyLen = entry.getKey().length();
+ sb.append("\"");
+ sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen)));
+ sb.append("\"=\"");
+ sb.append(escapeString(entry.getValue()));
+ sb.append("\" ");
+ }
+ return sb.toString();
+ }
+
+ public static String escapeString(String data) {
+ return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR,
+ new char[] {'"', '=', '.'});
+ }
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
@@ -1217,7 +1251,11 @@ public class JobImpl implements org.apac
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
- job.jobACLs, job.queueName);
+ job.jobACLs, job.queueName,
+ job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
+ job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
+ job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
+ getWorkflowAdjacencies(job.conf));
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Jan 29 21:35:24 2013
@@ -66,6 +66,8 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
+import org.apache.hadoop.mapreduce.v2.api.records.Locality;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@@ -156,7 +158,8 @@ public abstract class TaskAttemptImpl im
private final org.apache.hadoop.mapred.JobID oldJobId;
private final TaskAttemptListener taskAttemptListener;
private final Resource resourceCapability;
- private final String[] dataLocalHosts;
+ protected Set<String> dataLocalHosts;
+ protected Set<String> dataLocalRacks;
private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
@@ -175,6 +178,8 @@ public abstract class TaskAttemptImpl im
private int shufflePort = -1;
private String trackerName;
private int httpPort;
+ private Locality locality;
+ private Avataar avataar;
private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
new CleanupContainerTransition();
@@ -532,8 +537,16 @@ public abstract class TaskAttemptImpl im
getMemoryRequired(conf, taskId.getTaskType()));
this.resourceCapability.setVirtualCores(
getCpuRequired(conf, taskId.getTaskType()));
- this.dataLocalHosts = dataLocalHosts;
+
+ this.dataLocalHosts = resolveHosts(dataLocalHosts);
RackResolver.init(conf);
+ this.dataLocalRacks = new HashSet<String>();
+ for (String host : this.dataLocalHosts) {
+ this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
+ }
+
+ locality = Locality.OFF_SWITCH;
+ avataar = Avataar.VIRGIN;
// This "this leak" is okay because the retained pointer is in an
// instance variable.
@@ -1032,6 +1045,23 @@ public abstract class TaskAttemptImpl im
}
}
+ public Locality getLocality() {
+ return locality;
+ }
+
+ public void setLocality(Locality locality) {
+ this.locality = locality;
+ }
+
+ public Avataar getAvataar()
+ {
+ return avataar;
+ }
+
+ public void setAvataar(Avataar avataar) {
+ this.avataar = avataar;
+ }
+
private static TaskAttemptState getExternalState(
TaskAttemptStateInternal smState) {
switch (smState) {
@@ -1232,25 +1262,27 @@ public abstract class TaskAttemptImpl im
taskAttempt.attemptId,
taskAttempt.resourceCapability));
} else {
- Set<String> racks = new HashSet<String>();
- for (String host : taskAttempt.dataLocalHosts) {
- racks.add(RackResolver.resolve(host).getNetworkLocation());
- }
taskAttempt.eventHandler.handle(new ContainerRequestEvent(
- taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt
- .resolveHosts(taskAttempt.dataLocalHosts), racks
- .toArray(new String[racks.size()])));
+ taskAttempt.attemptId, taskAttempt.resourceCapability,
+ taskAttempt.dataLocalHosts.toArray(
+ new String[taskAttempt.dataLocalHosts.size()]),
+ taskAttempt.dataLocalRacks.toArray(
+ new String[taskAttempt.dataLocalRacks.size()])));
}
}
}
- protected String[] resolveHosts(String[] src) {
- String[] result = new String[src.length];
- for (int i = 0; i < src.length; i++) {
- if (isIP(src[i])) {
- result[i] = resolveHost(src[i]);
- } else {
- result[i] = src[i];
+ protected Set<String> resolveHosts(String[] src) {
+ Set<String> result = new HashSet<String>();
+ if (src != null) {
+ for (int i = 0; i < src.length; i++) {
+ if (src[i] == null) {
+ continue;
+ } else if (isIP(src[i])) {
+ result.add(resolveHost(src[i]));
+ } else {
+ result.add(src[i]);
+ }
}
}
return result;
@@ -1300,6 +1332,20 @@ public abstract class TaskAttemptImpl im
taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
taskAttempt.taskAttemptListener.registerPendingTask(
taskAttempt.remoteTask, taskAttempt.jvmID);
+
+ taskAttempt.locality = Locality.OFF_SWITCH;
+ if (taskAttempt.dataLocalHosts.size() > 0) {
+ String cHost = taskAttempt.resolveHost(
+ taskAttempt.containerNodeId.getHost());
+ if (taskAttempt.dataLocalHosts.contains(cHost)) {
+ taskAttempt.locality = Locality.NODE_LOCAL;
+ }
+ }
+ if (taskAttempt.locality == Locality.OFF_SWITCH) {
+ if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) {
+ taskAttempt.locality = Locality.RACK_LOCAL;
+ }
+ }
//launch the container
//create the container object to be launched for a given Task attempt
@@ -1376,7 +1422,7 @@ public abstract class TaskAttemptImpl im
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
} else {
LOG.debug("Not generating HistoryFinish event since start event not " +
- "generated for taskAttempt: " + taskAttempt.getID());
+ "generated for taskAttempt: " + taskAttempt.getID());
}
}
}
@@ -1421,7 +1467,8 @@ public abstract class TaskAttemptImpl im
TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
taskAttempt.launchTime,
nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
- taskAttempt.shufflePort, taskAttempt.containerID);
+ taskAttempt.shufflePort, taskAttempt.containerID,
+ taskAttempt.locality.toString(), taskAttempt.avataar.toString());
taskAttempt.eventHandler.handle
(new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
taskAttempt.eventHandler.handle
@@ -1510,7 +1557,7 @@ public abstract class TaskAttemptImpl im
// handling failed map/reduce events.
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
- "generated for taskAttempt: " + taskAttempt.getID());
+ "generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
@@ -1580,7 +1627,7 @@ public abstract class TaskAttemptImpl im
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
- "generated for taskAttempt: " + taskAttempt.getID());
+ "generated for taskAttempt: " + taskAttempt.getID());
}
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
@@ -1648,7 +1695,7 @@ public abstract class TaskAttemptImpl im
taskAttempt.attemptId.getTaskId().getJobId(), tauce));
}else {
LOG.debug("Not generating HistoryFinish event since start event not " +
- "generated for taskAttempt: " + taskAttempt.getID());
+ "generated for taskAttempt: " + taskAttempt.getID());
}
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Jan 29 21:35:24 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
@@ -594,8 +595,9 @@ public abstract class TaskImpl implement
}
// This is always called in the Write Lock
- private void addAndScheduleAttempt() {
+ private void addAndScheduleAttempt(Avataar avataar) {
TaskAttempt attempt = createAttempt();
+ ((TaskAttemptImpl) attempt).setAvataar(avataar);
if (LOG.isDebugEnabled()) {
LOG.debug("Created attempt " + attempt.getID());
}
@@ -749,7 +751,7 @@ public abstract class TaskImpl implement
@Override
public void transition(TaskImpl task, TaskEvent event) {
- task.addAndScheduleAttempt();
+ task.addAndScheduleAttempt(Avataar.VIRGIN);
task.scheduledTime = task.clock.getTime();
TaskStartedEvent tse = new TaskStartedEvent(
TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
@@ -772,7 +774,7 @@ public abstract class TaskImpl implement
@Override
public void transition(TaskImpl task, TaskEvent event) {
LOG.info("Scheduling a redundant attempt for task " + task.taskId);
- task.addAndScheduleAttempt();
+ task.addAndScheduleAttempt(Avataar.SPECULATIVE);
}
}
@@ -849,7 +851,7 @@ public abstract class TaskImpl implement
task.finishedAttempts.add(taskAttemptId);
task.inProgressAttempts.remove(taskAttemptId);
if (task.successfulAttempt == null) {
- task.addAndScheduleAttempt();
+ task.addAndScheduleAttempt(Avataar.VIRGIN);
}
}
}
@@ -937,7 +939,7 @@ public abstract class TaskImpl implement
task.inProgressAttempts.remove(taskAttemptId);
if (task.inProgressAttempts.size() == 0
&& task.successfulAttempt == null) {
- task.addAndScheduleAttempt();
+ task.addAndScheduleAttempt(Avataar.VIRGIN);
}
} else {
task.handleTaskAttemptCompletion(
@@ -1053,7 +1055,7 @@ public abstract class TaskImpl implement
// from the map splitInfo. So the bad node might be sent as a location
// to the RM. But the RM would ignore that just like it would ignore
// currently pending container requests affinitized to bad nodes.
- task.addAndScheduleAttempt();
+ task.addAndScheduleAttempt(Avataar.VIRGIN);
return TaskStateInternal.SCHEDULED;
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Jan 29 21:35:24 2013
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Jan 29 21:35:24 2013
@@ -33,6 +33,9 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.MRConfig;
@@ -66,6 +69,7 @@ import org.apache.hadoop.yarn.SystemCloc
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -105,6 +109,13 @@ public class TestJobImpl {
Configuration conf = new Configuration();
conf.setInt(MRJobConfig.NUM_REDUCES, 0);
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ conf.set(MRJobConfig.WORKFLOW_ID, "testId");
+ conf.set(MRJobConfig.WORKFLOW_NAME, "testName");
+ conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
+ conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
+ conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
+
+
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(conf);
dispatcher.start();
@@ -114,6 +125,9 @@ public class TestJobImpl {
commitHandler.init(conf);
commitHandler.start();
+ JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
+ "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ");
+ dispatcher.register(EventType.class, jseHandler);
JobImpl job = createStubbedJob(conf, dispatcher, 0);
job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
assertJobState(job, JobStateInternal.INITED);
@@ -121,6 +135,11 @@ public class TestJobImpl {
assertJobState(job, JobStateInternal.SUCCEEDED);
dispatcher.stop();
commitHandler.stop();
+ try {
+ Assert.assertTrue(jseHandler.getAssertValue());
+ } catch (InterruptedException e) {
+ Assert.fail("Workflow related attributes are not tested properly");
+ }
}
@Test(timeout=20000)
@@ -614,6 +633,67 @@ public class TestJobImpl {
Assert.assertEquals(state, job.getInternalState());
}
+ private static class JobSubmittedEventHandler implements
+ EventHandler<JobHistoryEvent> {
+
+ private String workflowId;
+
+ private String workflowName;
+
+ private String workflowNodeName;
+
+ private String workflowAdjacencies;
+
+ private Boolean assertBoolean;
+
+ public JobSubmittedEventHandler(String workflowId, String workflowName,
+ String workflowNodeName, String workflowAdjacencies) {
+ this.workflowId = workflowId;
+ this.workflowName = workflowName;
+ this.workflowNodeName = workflowNodeName;
+ this.workflowAdjacencies = workflowAdjacencies;
+ assertBoolean = null;
+ }
+
+ @Override
+ public void handle(JobHistoryEvent jhEvent) {
+ if (jhEvent.getType() != EventType.JOB_SUBMITTED) {
+ return;
+ }
+ JobSubmittedEvent jsEvent = (JobSubmittedEvent) jhEvent.getHistoryEvent();
+ if (!workflowId.equals(jsEvent.getWorkflowId())) {
+ setAssertValue(false);
+ return;
+ }
+ if (!workflowName.equals(jsEvent.getWorkflowName())) {
+ setAssertValue(false);
+ return;
+ }
+ if (!workflowNodeName.equals(jsEvent.getWorkflowNodeName())) {
+ setAssertValue(false);
+ return;
+ }
+ if (!workflowAdjacencies.equals(jsEvent.getWorkflowAdjacencies())) {
+ setAssertValue(false);
+ return;
+ }
+ setAssertValue(true);
+ }
+
+ private synchronized void setAssertValue(Boolean bool) {
+ assertBoolean = bool;
+ notify();
+ }
+
+ public synchronized boolean getAssertValue() throws InterruptedException {
+ while (assertBoolean == null) {
+ wait();
+ }
+ return assertBoolean;
+ }
+
+ }
+
private static class StubbedJob extends JobImpl {
//override the init transition
private final InitTransition initTransition;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Tue Jan 29 21:35:24 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Locality;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -157,6 +158,7 @@ public class TestTaskAttempt{
createMapTaskAttemptImplForTest(eventHandler, splitInfo);
TaskAttemptImpl spyTa = spy(mockTaskAttempt);
when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
+ spyTa.dataLocalHosts = spyTa.resolveHosts(splitInfo.getLocations());
TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
rct.transition(spyTa, mockTAEvent);
@@ -360,6 +362,8 @@ public class TestTaskAttempt{
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
assertFalse(eventHandler.internalError);
+ assertEquals("Task attempt is not assigned on the local node",
+ Locality.NODE_LOCAL, taImpl.getLocality());
}
@Test
@@ -398,7 +402,7 @@ public class TestTaskAttempt{
mock(Token.class), new Credentials(),
new SystemClock(), appCtx);
- NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.2", 0);
ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
@@ -416,6 +420,8 @@ public class TestTaskAttempt{
TaskAttemptEventType.TA_CONTAINER_CLEANED));
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
eventHandler.internalError);
+ assertEquals("Task attempt is not assigned on the local rack",
+ Locality.RACK_LOCAL, taImpl.getLocality());
}
@Test
@@ -439,7 +445,7 @@ public class TestTaskAttempt{
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
- when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+ when(splits.getLocations()).thenReturn(new String[] {});
AppContext appCtx = mock(AppContext.class);
ClusterInfo clusterInfo = mock(ClusterInfo.class);
@@ -475,6 +481,8 @@ public class TestTaskAttempt{
TaskAttemptEventType.TA_CONTAINER_CLEANED));
assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
eventHandler.internalError);
+ assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
+ taImpl.getLocality());
}
@Test
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Jan 29 21:35:24 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.TaskUmbi
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -46,10 +47,12 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
@@ -254,6 +257,7 @@ public class TestTaskImpl {
mockTask.handle(new TaskEvent(taskId,
TaskEventType.T_SCHEDULE));
assertTaskScheduledState();
+ assertTaskAttemptAvataar(Avataar.VIRGIN);
}
private void killTask(TaskId taskId) {
@@ -338,6 +342,19 @@ public class TestTaskImpl {
private void assertTaskSucceededState() {
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
}
+
+ /**
+ * {@link Avataar}
+ */
+ private void assertTaskAttemptAvataar(Avataar avataar) {
+ for (TaskAttempt taskAttempt : mockTask.getAttempts().values()) {
+ if (((TaskAttemptImpl) taskAttempt).getAvataar() == avataar) {
+ return;
+ }
+ }
+ fail("There is no " + (avataar == Avataar.VIRGIN ? "virgin" : "speculative")
+ + "task attempt");
+ }
@Test
public void testInit() {
@@ -516,6 +533,9 @@ public class TestTaskImpl {
// The task should still be in the succeeded state
assertTaskSucceededState();
+
+ // The task should contain speculative a task attempt
+ assertTaskAttemptAvataar(Avataar.SPECULATIVE);
}
@Test
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Tue Jan 29 21:35:24 2013
@@ -91,7 +91,11 @@
"values": "string"
}
},
- {"name": "jobQueueName", "type": "string"}
+ {"name": "jobQueueName", "type": "string"},
+ {"name": "workflowId", "type": "string"},
+ {"name": "workflowName", "type": "string"},
+ {"name": "workflowNodeName", "type": "string"},
+ {"name": "workflowAdjacencies", "type": "string"}
]
},
@@ -191,7 +195,9 @@
{"name": "trackerName", "type": "string"},
{"name": "httpPort", "type": "int"},
{"name": "shufflePort", "type": "int"},
- {"name": "containerId", "type": "string"}
+ {"name": "containerId", "type": "string"},
+ {"name": "locality", "type": "string"},
+ {"name": "avataar", "type": "string"}
]
},
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Tue Jan 29 21:35:24 2013
@@ -218,6 +218,7 @@ public class Merger {
CompressionCodec codec = null;
long segmentOffset = 0;
long segmentLength = -1;
+ long rawDataLength = -1;
Counters.Counter mapOutputsCounter = null;
@@ -234,6 +235,15 @@ public class Merger {
this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
mergedMapOutputsCounter);
}
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ CompressionCodec codec, boolean preserve,
+ Counters.Counter mergedMapOutputsCounter, long rawDataLength)
+ throws IOException {
+ this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
+ mergedMapOutputsCounter);
+ this.rawDataLength = rawDataLength;
+ }
public Segment(Configuration conf, FileSystem fs, Path file,
long segmentOffset, long segmentLength,
@@ -261,6 +271,11 @@ public class Merger {
public Segment(Reader<K, V> reader, boolean preserve) {
this(reader, preserve, null);
}
+
+ public Segment(Reader<K, V> reader, boolean preserve, long rawDataLength) {
+ this(reader, preserve, null);
+ this.rawDataLength = rawDataLength;
+ }
public Segment(Reader<K, V> reader, boolean preserve,
Counters.Counter mapOutputsCounter) {
@@ -300,6 +315,10 @@ public class Merger {
segmentLength : reader.getLength();
}
+ public long getRawDataLength() {
+ return (rawDataLength > 0) ? rawDataLength : getLength();
+ }
+
boolean nextRawKey() throws IOException {
return reader.nextRawKey(key);
}
@@ -633,7 +652,7 @@ public class Merger {
totalBytesProcessed = 0;
totalBytes = 0;
for (int i = 0; i < segmentsToMerge.size(); i++) {
- totalBytes += segmentsToMerge.get(i).getLength();
+ totalBytes += segmentsToMerge.get(i).getRawDataLength();
}
}
if (totalBytes != 0) //being paranoid
@@ -702,7 +721,7 @@ public class Merger {
// size will match(almost) if combiner is not called in merge.
long inputBytesOfThisMerge = totalBytesProcessed -
bytesProcessedInPrevMerges;
- totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
+ totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();
if (totalBytes != 0) {
progPerByte = 1.0f / (float)totalBytes;
}
@@ -768,7 +787,7 @@ public class Merger {
for (int i = 0; i < numSegments; i++) {
// Not handling empty segments here assuming that it would not affect
// much in calculation of mergeProgress.
- segmentSizes.add(segments.get(i).getLength());
+ segmentSizes.add(segments.get(i).getRawDataLength());
}
// If includeFinalMerge is true, allow the following while loop iterate
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Jan 29 21:35:24 2013
@@ -647,5 +647,18 @@ public interface MRJobConfig {
"$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*",
"$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
};
+
+ public static final String WORKFLOW_ID = "mapreduce.workflow.id";
+
+ public static final String WORKFLOW_NAME = "mapreduce.workflow.name";
+
+ public static final String WORKFLOW_NODE_NAME =
+ "mapreduce.workflow.node.name";
+
+ public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
+ "mapreduce.workflow.adjacency.";
+
+ public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
+ "^mapreduce\\.workflow\\.adjacency\\..+";
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java Tue Jan 29 21:35:24 2013
@@ -52,6 +52,29 @@ public class JobSubmittedEvent implement
public JobSubmittedEvent(JobID id, String jobName, String userName,
long submitTime, String jobConfPath,
Map<JobACL, AccessControlList> jobACLs, String jobQueueName) {
+ this(id, jobName, userName, submitTime, jobConfPath, jobACLs,
+ jobQueueName, "", "", "", "");
+ }
+
+ /**
+ * Create an event to record job submission
+ * @param id The job Id of the job
+ * @param jobName Name of the job
+ * @param userName Name of the user who submitted the job
+ * @param submitTime Time of submission
+ * @param jobConfPath Path of the Job Configuration file
+ * @param jobACLs The configured acls for the job.
+ * @param jobQueueName The job-queue to which this job was submitted to
+ * @param workflowId The Id of the workflow
+ * @param workflowName The name of the workflow
+ * @param workflowNodeName The node name of the workflow
+ * @param workflowAdjacencies The adjacencies of the workflow
+ */
+ public JobSubmittedEvent(JobID id, String jobName, String userName,
+ long submitTime, String jobConfPath,
+ Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
+ String workflowId, String workflowName, String workflowNodeName,
+ String workflowAdjacencies) {
datum.jobid = new Utf8(id.toString());
datum.jobName = new Utf8(jobName);
datum.userName = new Utf8(userName);
@@ -66,6 +89,18 @@ public class JobSubmittedEvent implement
if (jobQueueName != null) {
datum.jobQueueName = new Utf8(jobQueueName);
}
+ if (workflowId != null) {
+ datum.workflowId = new Utf8(workflowId);
+ }
+ if (workflowName != null) {
+ datum.workflowName = new Utf8(workflowName);
+ }
+ if (workflowNodeName != null) {
+ datum.workflowNodeName = new Utf8(workflowNodeName);
+ }
+ if (workflowAdjacencies != null) {
+ datum.workflowAdjacencies = new Utf8(workflowAdjacencies);
+ }
}
JobSubmittedEvent() {}
@@ -105,6 +140,34 @@ public class JobSubmittedEvent implement
}
return jobAcls;
}
+ /** Get the id of the workflow */
+ public String getWorkflowId() {
+ if (datum.workflowId != null) {
+ return datum.workflowId.toString();
+ }
+ return null;
+ }
+ /** Get the name of the workflow */
+ public String getWorkflowName() {
+ if (datum.workflowName != null) {
+ return datum.workflowName.toString();
+ }
+ return null;
+ }
+ /** Get the node name of the workflow */
+ public String getWorkflowNodeName() {
+ if (datum.workflowNodeName != null) {
+ return datum.workflowNodeName.toString();
+ }
+ return null;
+ }
+ /** Get the adjacencies of the workflow */
+ public String getWorkflowAdjacencies() {
+ if (datum.workflowAdjacencies != null) {
+ return datum.workflowAdjacencies.toString();
+ }
+ return null;
+ }
/** Get the event type */
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java Tue Jan 29 21:35:24 2013
@@ -46,10 +46,13 @@ public class TaskAttemptStartedEvent imp
* @param httpPort The port number of the tracker
* @param shufflePort The shuffle port number of the container
* @param containerId The containerId for the task attempt.
+ * @param locality The locality of the task attempt
+ * @param avataar The avataar of the task attempt
*/
public TaskAttemptStartedEvent( TaskAttemptID attemptId,
TaskType taskType, long startTime, String trackerName,
- int httpPort, int shufflePort, ContainerId containerId) {
+ int httpPort, int shufflePort, ContainerId containerId,
+ String locality, String avataar) {
datum.attemptId = new Utf8(attemptId.toString());
datum.taskid = new Utf8(attemptId.getTaskID().toString());
datum.startTime = startTime;
@@ -58,14 +61,21 @@ public class TaskAttemptStartedEvent imp
datum.httpPort = httpPort;
datum.shufflePort = shufflePort;
datum.containerId = new Utf8(containerId.toString());
+ if (locality != null) {
+ datum.locality = new Utf8(locality);
+ }
+ if (avataar != null) {
+ datum.avataar = new Utf8(avataar);
+ }
}
// TODO Remove after MrV1 is removed.
// Using a dummy containerId to prevent jobHistory parse failures.
public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType,
- long startTime, String trackerName, int httpPort, int shufflePort) {
+ long startTime, String trackerName, int httpPort, int shufflePort,
+ String locality, String avataar) {
this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort,
- ConverterUtils.toContainerId("container_-1_-1_-1_-1"));
+ ConverterUtils.toContainerId("container_-1_-1_-1_-1"), locality, avataar);
}
TaskAttemptStartedEvent() {}
@@ -105,4 +115,19 @@ public class TaskAttemptStartedEvent imp
public ContainerId getContainerId() {
return ConverterUtils.toContainerId(datum.containerId.toString());
}
+ /** Get the locality */
+ public String getLocality() {
+ if (datum.locality != null) {
+ return datum.locality.toString();
+ }
+ return null;
+ }
+ /** Get the avataar */
+ public String getAvataar() {
+ if (datum.avataar != null) {
+ return datum.avataar.toString();
+ }
+ return null;
+ }
+
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Tue Jan 29 21:35:24 2013
@@ -89,7 +89,7 @@ public class MergeManagerImpl<K, V> impl
new TreeSet<InMemoryMapOutput<K,V>>(new MapOutputComparator<K, V>());
private final MergeThread<InMemoryMapOutput<K,V>, K,V> inMemoryMerger;
- Set<Path> onDiskMapOutputs = new TreeSet<Path>();
+ Set<CompressAwarePath> onDiskMapOutputs = new TreeSet<CompressAwarePath>();
private final OnDiskMerger onDiskMerger;
private final long memoryLimit;
@@ -336,7 +336,7 @@ public class MergeManagerImpl<K, V> impl
inMemoryMergedMapOutputs.size());
}
- public synchronized void closeOnDiskFile(Path file) {
+ public synchronized void closeOnDiskFile(CompressAwarePath file) {
onDiskMapOutputs.add(file);
if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {
@@ -356,7 +356,7 @@ public class MergeManagerImpl<K, V> impl
List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
memory.addAll(inMemoryMapOutputs);
- List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
+ List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
return finalMerge(jobConf, rfs, memory, disk);
}
@@ -456,6 +456,7 @@ public class MergeManagerImpl<K, V> impl
codec, null);
RawKeyValueIterator rIter = null;
+ CompressAwarePath compressAwarePath;
try {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
@@ -474,6 +475,8 @@ public class MergeManagerImpl<K, V> impl
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
}
+ compressAwarePath = new CompressAwarePath(outputPath,
+ writer.getRawLength());
writer.close();
LOG.info(reduceId +
@@ -489,12 +492,12 @@ public class MergeManagerImpl<K, V> impl
}
// Note the output of the merge
- closeOnDiskFile(outputPath);
+ closeOnDiskFile(compressAwarePath);
}
}
- private class OnDiskMerger extends MergeThread<Path,K,V> {
+ private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
public OnDiskMerger(MergeManagerImpl<K, V> manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter);
@@ -503,7 +506,7 @@ public class MergeManagerImpl<K, V> impl
}
@Override
- public void merge(List<Path> inputs) throws IOException {
+ public void merge(List<CompressAwarePath> inputs) throws IOException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
@@ -518,7 +521,7 @@ public class MergeManagerImpl<K, V> impl
" map outputs on disk. Triggering merge...");
// 1. Prepare the list of files to be merged.
- for (Path file : inputs) {
+ for (CompressAwarePath file : inputs) {
approxOutputSize += localFS.getFileStatus(file).getLen();
}
@@ -536,6 +539,7 @@ public class MergeManagerImpl<K, V> impl
(Class<V>) jobConf.getMapOutputValueClass(),
codec, null);
RawKeyValueIterator iter = null;
+ CompressAwarePath compressAwarePath;
Path tmpDir = new Path(reduceId.toString());
try {
iter = Merger.merge(jobConf, rfs,
@@ -548,13 +552,15 @@ public class MergeManagerImpl<K, V> impl
mergedMapOutputsCounter, null);
Merger.writeFile(iter, writer, reporter, jobConf);
+ compressAwarePath = new CompressAwarePath(outputPath,
+ writer.getRawLength());
writer.close();
} catch (IOException e) {
localFS.delete(outputPath, true);
throw e;
}
- closeOnDiskFile(outputPath);
+ closeOnDiskFile(compressAwarePath);
LOG.info(reduceId +
" Finished merging " + inputs.size() +
@@ -653,7 +659,7 @@ public class MergeManagerImpl<K, V> impl
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
- List<Path> onDiskMapOutputs
+ List<CompressAwarePath> onDiskMapOutputs
) throws IOException {
LOG.info("finalMerge called with " +
inMemoryMapOutputs.size() + " in-memory map-outputs and " +
@@ -712,7 +718,8 @@ public class MergeManagerImpl<K, V> impl
try {
Merger.writeFile(rIter, writer, reporter, job);
// add to list of final disk outputs.
- onDiskMapOutputs.add(outputPath);
+ onDiskMapOutputs.add(new CompressAwarePath(outputPath,
+ writer.getRawLength()));
} catch (IOException e) {
if (null != outputPath) {
try {
@@ -742,15 +749,19 @@ public class MergeManagerImpl<K, V> impl
// segments on disk
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
long onDiskBytes = inMemToDiskBytes;
- Path[] onDisk = onDiskMapOutputs.toArray(new Path[onDiskMapOutputs.size()]);
- for (Path file : onDisk) {
- onDiskBytes += fs.getFileStatus(file).getLen();
- LOG.debug("Disk file: " + file + " Length is " +
- fs.getFileStatus(file).getLen());
+ long rawBytes = inMemToDiskBytes;
+ CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
+ new CompressAwarePath[onDiskMapOutputs.size()]);
+ for (CompressAwarePath file : onDisk) {
+ long fileLength = fs.getFileStatus(file).getLen();
+ onDiskBytes += fileLength;
+ rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;
+
+ LOG.debug("Disk file: " + file + " Length is " + fileLength);
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
(file.toString().endsWith(
Task.MERGED_OUTPUT_PREFIX) ?
- null : mergedMapOutputsCounter)
+ null : mergedMapOutputsCounter), file.getRawDataLength()
));
}
LOG.info("Merging " + onDisk.length + " files, " +
@@ -786,7 +797,7 @@ public class MergeManagerImpl<K, V> impl
return diskMerge;
}
finalSegments.add(new Segment<K,V>(
- new RawKVIteratorReader(diskMerge, onDiskBytes), true));
+ new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
@@ -794,4 +805,27 @@ public class MergeManagerImpl<K, V> impl
null);
}
+
+ static class CompressAwarePath extends Path {
+ private long rawDataLength;
+
+ public CompressAwarePath(Path path, long rawDataLength) {
+ super(path.toUri());
+ this.rawDataLength = rawDataLength;
+ }
+
+ public long getRawDataLength() {
+ return rawDataLength;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return super.equals(other);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1440155&r1=1440154&r2=1440155&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Tue Jan 29 21:35:24 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -112,7 +113,9 @@ class OnDiskMapOutput<K, V> extends MapO
@Override
public void commit() throws IOException {
localFS.rename(tmpOutputPath, outputPath);
- merger.closeOnDiskFile(outputPath);
+ CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
+ getSize());
+ merger.closeOnDiskFile(compressAwarePath);
}
@Override
Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1438241-1440147