You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2013/01/30 00:54:02 UTC

svn commit: r1440222 - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-clien...

Author: szetszwo
Date: Tue Jan 29 23:53:59 2013
New Revision: 1440222

URL: http://svn.apache.org/viewvc?rev=1440222&view=rev
Log:
Merge r1438306 through r1440221 from trunk.

Added:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Avataar.java
      - copied unchanged from r1440221, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Avataar.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Locality.java
      - copied unchanged from r1440221, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/Locality.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
      - copied unchanged from r1440221, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMerger.java
Removed:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestMerger.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIndexCache.java
Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1438306-1440221

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Tue Jan 29 23:53:59 2013
@@ -11,16 +11,9 @@ Trunk (Unreleased)
     MAPREDUCE-2669. Add new examples for Mean, Median, and Standard Deviation.
     (Plamen Jeliazkov via shv)
 
-    MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
-    (Avner BenHanoch via acmurthy) 
-
-    MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
-
     MAPREDUCE-4887. Add RehashPartitioner, to smooth distributions
     with poor implementations of Object#hashCode().  (Radim Kolar via cutting)
 
-    MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse by Shuffle implementations. (masokan via tucu)
-
   IMPROVEMENTS
 
     MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
@@ -78,9 +71,6 @@ Trunk (Unreleased)
     MAPREDUCE-4735. Make arguments in TestDFSIO case insensitive.
     (Brandon Li via suresh)
 
-    MAPREDUCE-4809. Change visibility of classes for pluggable sort changes. 
-    (masokan via tucu)
-
   BUG FIXES
 
     MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
@@ -180,6 +170,14 @@ Release 2.0.3-alpha - Unreleased 
     MAPREDUCE-4810. Added new admin command options for MR AM. (Jerry Chen via
     vinodkv)
 
+    MAPREDUCE-4049. Experimental api to allow for alternate shuffle plugins.
+    (Avner BenHanoch via acmurthy) 
+
+    MAPREDUCE-4807. Allow MapOutputBuffer to be pluggable. (masokan via tucu)
+
+    MAPREDUCE-4808. Refactor MapOutput and MergeManager to facilitate reuse 
+    by Shuffle implementations. (masokan via tucu)
+
   IMPROVEMENTS
 
     MAPREDUCE-3678. The Map tasks logs should have the value of input
@@ -211,6 +209,12 @@ Release 2.0.3-alpha - Unreleased 
 
     MAPREDUCE-4949. Enable multiple pi jobs to run in parallel. (sandyr via tucu)
 
+    MAPREDUCE-4809. Change visibility of classes for pluggable sort changes. 
+    (masokan via tucu)
+
+    MAPREDUCE-4838. Add additional fields like Locality, Avataar to the
+    JobHistory logs. (Zhijie Shen via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -268,6 +272,9 @@ Release 2.0.3-alpha - Unreleased 
     MAPREDUCE-4948. Fix a failing unit test TestYARNRunner.testHistoryServerToken.
     (Junping Du via sseth)
 
+    MAPREDUCE-4803. Remove duplicate copy of TestIndexCache. (Mariappan Asokan
+    via sseth)
+
     MAPREDUCE-2264. Job status exceeds 100% in some cases. 
     (devaraj.k and sandyr via tucu)
 

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1438306-1440221

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1438306-1440221

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Jan 29 23:53:59 2013
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -1192,6 +1193,39 @@ public class JobImpl implements org.apac
     }
   }
   */
+  /**
+    * Get the workflow adjacencies from the job conf
+    * The string returned is of the form "key"="value" "key"="value" ...
+    */
+  private static String getWorkflowAdjacencies(Configuration conf) {
+    int prefixLen = MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING.length();
+    Map<String,String> adjacencies = 
+        conf.getValByRegex(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_PATTERN);
+    if (adjacencies.isEmpty()) {
+      return "";
+    }
+    int size = 0;
+    for (Entry<String,String> entry : adjacencies.entrySet()) {
+      int keyLen = entry.getKey().length();
+      size += keyLen - prefixLen;
+      size += entry.getValue().length() + 6;
+    }
+    StringBuilder sb = new StringBuilder(size);
+    for (Entry<String,String> entry : adjacencies.entrySet()) {
+      int keyLen = entry.getKey().length();
+      sb.append("\"");
+      sb.append(escapeString(entry.getKey().substring(prefixLen, keyLen)));
+      sb.append("\"=\"");
+      sb.append(escapeString(entry.getValue()));
+      sb.append("\" ");
+    }
+    return sb.toString();
+  }
+  
+  public static String escapeString(String data) {
+    return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR,
+        new char[] {'"', '=', '.'});
+  }
 
   public static class InitTransition 
       implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
@@ -1217,7 +1251,11 @@ public class JobImpl implements org.apac
             job.conf.get(MRJobConfig.USER_NAME, "mapred"),
             job.appSubmitTime,
             job.remoteJobConfFile.toString(),
-            job.jobACLs, job.queueName);
+            job.jobACLs, job.queueName,
+            job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
+            job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
+            job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
+            getWorkflowAdjacencies(job.conf));
         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
         //TODO JH Verify jobACLs, UserName via UGI?
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Jan 29 23:53:59 2013
@@ -66,6 +66,8 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
+import org.apache.hadoop.mapreduce.v2.api.records.Locality;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@@ -156,7 +158,8 @@ public abstract class TaskAttemptImpl im
   private final org.apache.hadoop.mapred.JobID oldJobId;
   private final TaskAttemptListener taskAttemptListener;
   private final Resource resourceCapability;
-  private final String[] dataLocalHosts;
+  protected Set<String> dataLocalHosts;
+  protected Set<String> dataLocalRacks;
   private final List<String> diagnostics = new ArrayList<String>();
   private final Lock readLock;
   private final Lock writeLock;
@@ -175,6 +178,8 @@ public abstract class TaskAttemptImpl im
   private int shufflePort = -1;
   private String trackerName;
   private int httpPort;
+  private Locality locality;
+  private Avataar avataar;
 
   private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
     new CleanupContainerTransition();
@@ -532,8 +537,16 @@ public abstract class TaskAttemptImpl im
         getMemoryRequired(conf, taskId.getTaskType()));
     this.resourceCapability.setVirtualCores(
         getCpuRequired(conf, taskId.getTaskType()));
-    this.dataLocalHosts = dataLocalHosts;
+
+    this.dataLocalHosts = resolveHosts(dataLocalHosts);
     RackResolver.init(conf);
+    this.dataLocalRacks = new HashSet<String>(); 
+    for (String host : this.dataLocalHosts) {
+      this.dataLocalRacks.add(RackResolver.resolve(host).getNetworkLocation());
+    }
+
+    locality = Locality.OFF_SWITCH;
+    avataar = Avataar.VIRGIN;
 
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
@@ -1032,6 +1045,23 @@ public abstract class TaskAttemptImpl im
     }
   }
 
+  public Locality getLocality() {
+    return locality;
+  }
+  
+  public void setLocality(Locality locality) {
+    this.locality = locality;
+  }
+
+  public Avataar getAvataar()
+  {
+    return avataar;
+  }
+  
+  public void setAvataar(Avataar avataar) {
+    this.avataar = avataar;
+  }
+  
   private static TaskAttemptState getExternalState(
       TaskAttemptStateInternal smState) {
     switch (smState) {
@@ -1232,25 +1262,27 @@ public abstract class TaskAttemptImpl im
                 taskAttempt.attemptId, 
                 taskAttempt.resourceCapability));
       } else {
-        Set<String> racks = new HashSet<String>(); 
-        for (String host : taskAttempt.dataLocalHosts) {
-          racks.add(RackResolver.resolve(host).getNetworkLocation());
-        }
         taskAttempt.eventHandler.handle(new ContainerRequestEvent(
-            taskAttempt.attemptId, taskAttempt.resourceCapability, taskAttempt
-                .resolveHosts(taskAttempt.dataLocalHosts), racks
-                .toArray(new String[racks.size()])));
+            taskAttempt.attemptId, taskAttempt.resourceCapability,
+            taskAttempt.dataLocalHosts.toArray(
+                new String[taskAttempt.dataLocalHosts.size()]),
+            taskAttempt.dataLocalRacks.toArray(
+                new String[taskAttempt.dataLocalRacks.size()])));
       }
     }
   }
 
-  protected String[] resolveHosts(String[] src) {
-    String[] result = new String[src.length];
-    for (int i = 0; i < src.length; i++) {
-      if (isIP(src[i])) {
-        result[i] = resolveHost(src[i]);
-      } else {
-        result[i] = src[i];
+  protected Set<String> resolveHosts(String[] src) {
+    Set<String> result = new HashSet<String>();
+    if (src != null) {
+      for (int i = 0; i < src.length; i++) {
+        if (src[i] == null) {
+          continue;
+        } else if (isIP(src[i])) {
+          result.add(resolveHost(src[i]));
+        } else {
+          result.add(src[i]);
+        }
       }
     }
     return result;
@@ -1300,6 +1332,20 @@ public abstract class TaskAttemptImpl im
           taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.getId());
       taskAttempt.taskAttemptListener.registerPendingTask(
           taskAttempt.remoteTask, taskAttempt.jvmID);
+
+      taskAttempt.locality = Locality.OFF_SWITCH;
+      if (taskAttempt.dataLocalHosts.size() > 0) {
+        String cHost = taskAttempt.resolveHost(
+            taskAttempt.containerNodeId.getHost());
+        if (taskAttempt.dataLocalHosts.contains(cHost)) {
+          taskAttempt.locality = Locality.NODE_LOCAL;
+        }
+      }
+      if (taskAttempt.locality == Locality.OFF_SWITCH) {
+        if (taskAttempt.dataLocalRacks.contains(taskAttempt.nodeRackName)) {
+          taskAttempt.locality = Locality.RACK_LOCAL;
+        }
+      }
       
       //launch the container
       //create the container object to be launched for a given Task attempt
@@ -1376,7 +1422,7 @@ public abstract class TaskAttemptImpl im
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       } else {
         LOG.debug("Not generating HistoryFinish event since start event not " +
-        		"generated for taskAttempt: " + taskAttempt.getID());
+            "generated for taskAttempt: " + taskAttempt.getID());
       }
     }
   }
@@ -1421,7 +1467,8 @@ public abstract class TaskAttemptImpl im
             TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
             taskAttempt.launchTime,
             nodeHttpInetAddr.getHostName(), nodeHttpInetAddr.getPort(),
-            taskAttempt.shufflePort, taskAttempt.containerID);
+            taskAttempt.shufflePort, taskAttempt.containerID,
+            taskAttempt.locality.toString(), taskAttempt.avataar.toString());
       taskAttempt.eventHandler.handle
           (new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tase));
       taskAttempt.eventHandler.handle
@@ -1510,7 +1557,7 @@ public abstract class TaskAttemptImpl im
         // handling failed map/reduce events.
       }else {
         LOG.debug("Not generating HistoryFinish event since start event not " +
-        		"generated for taskAttempt: " + taskAttempt.getID());
+            "generated for taskAttempt: " + taskAttempt.getID());
       }
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
@@ -1580,7 +1627,7 @@ public abstract class TaskAttemptImpl im
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {
         LOG.debug("Not generating HistoryFinish event since start event not " +
-        		"generated for taskAttempt: " + taskAttempt.getID());
+            "generated for taskAttempt: " + taskAttempt.getID());
       }
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
@@ -1648,7 +1695,7 @@ public abstract class TaskAttemptImpl im
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       }else {
         LOG.debug("Not generating HistoryFinish event since start event not " +
-        		"generated for taskAttempt: " + taskAttempt.getID());
+            "generated for taskAttempt: " + taskAttempt.getID());
       }
 //      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Jan 29 23:53:59 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
@@ -594,8 +595,9 @@ public abstract class TaskImpl implement
   }
 
   // This is always called in the Write Lock
-  private void addAndScheduleAttempt() {
+  private void addAndScheduleAttempt(Avataar avataar) {
     TaskAttempt attempt = createAttempt();
+    ((TaskAttemptImpl) attempt).setAvataar(avataar);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created attempt " + attempt.getID());
     }
@@ -749,7 +751,7 @@ public abstract class TaskImpl implement
 
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
-      task.addAndScheduleAttempt();
+      task.addAndScheduleAttempt(Avataar.VIRGIN);
       task.scheduledTime = task.clock.getTime();
       TaskStartedEvent tse = new TaskStartedEvent(
           TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
@@ -772,7 +774,7 @@ public abstract class TaskImpl implement
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
       LOG.info("Scheduling a redundant attempt for task " + task.taskId);
-      task.addAndScheduleAttempt();
+      task.addAndScheduleAttempt(Avataar.SPECULATIVE);
     }
   }
 
@@ -849,7 +851,7 @@ public abstract class TaskImpl implement
       task.finishedAttempts.add(taskAttemptId);
       task.inProgressAttempts.remove(taskAttemptId);
       if (task.successfulAttempt == null) {
-        task.addAndScheduleAttempt();
+        task.addAndScheduleAttempt(Avataar.VIRGIN);
       }
     }
   }
@@ -937,7 +939,7 @@ public abstract class TaskImpl implement
         task.inProgressAttempts.remove(taskAttemptId);
         if (task.inProgressAttempts.size() == 0
             && task.successfulAttempt == null) {
-          task.addAndScheduleAttempt();
+          task.addAndScheduleAttempt(Avataar.VIRGIN);
         }
       } else {
         task.handleTaskAttemptCompletion(
@@ -1053,7 +1055,7 @@ public abstract class TaskImpl implement
       // from the map splitInfo. So the bad node might be sent as a location
       // to the RM. But the RM would ignore that just like it would ignore
       // currently pending container requests affinitized to bad nodes.
-      task.addAndScheduleAttempt();
+      task.addAndScheduleAttempt(Avataar.VIRGIN);
       return TaskStateInternal.SCHEDULED;
     }
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Tue Jan 29 23:53:59 2013
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.RackResolver;
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Jan 29 23:53:59 2013
@@ -33,6 +33,9 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.MRConfig;
@@ -66,6 +69,7 @@ import org.apache.hadoop.yarn.SystemCloc
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
@@ -105,6 +109,13 @@ public class TestJobImpl {
     Configuration conf = new Configuration();
     conf.setInt(MRJobConfig.NUM_REDUCES, 0);
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    conf.set(MRJobConfig.WORKFLOW_ID, "testId");
+    conf.set(MRJobConfig.WORKFLOW_NAME, "testName");
+    conf.set(MRJobConfig.WORKFLOW_NODE_NAME, "testNodeName");
+    conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key1", "value1");
+    conf.set(MRJobConfig.WORKFLOW_ADJACENCY_PREFIX_STRING + "key2", "value2");
+    
+ 
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -114,6 +125,9 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
+    JobSubmittedEventHandler jseHandler = new JobSubmittedEventHandler("testId",
+        "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ");
+    dispatcher.register(EventType.class, jseHandler);
     JobImpl job = createStubbedJob(conf, dispatcher, 0);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -121,6 +135,11 @@ public class TestJobImpl {
     assertJobState(job, JobStateInternal.SUCCEEDED);
     dispatcher.stop();
     commitHandler.stop();
+    try {
+      Assert.assertTrue(jseHandler.getAssertValue());
+    } catch (InterruptedException e) {
+      Assert.fail("Workflow related attributes are not tested properly");
+    }
   }
 
   @Test(timeout=20000)
@@ -614,6 +633,67 @@ public class TestJobImpl {
     Assert.assertEquals(state, job.getInternalState());
   }
 
+  private static class JobSubmittedEventHandler implements
+      EventHandler<JobHistoryEvent> {
+
+    private String workflowId;
+    
+    private String workflowName;
+    
+    private String workflowNodeName;
+    
+    private String workflowAdjacencies;
+    
+    private Boolean assertBoolean;
+
+    public JobSubmittedEventHandler(String workflowId, String workflowName,
+        String workflowNodeName, String workflowAdjacencies) {
+      this.workflowId = workflowId;
+      this.workflowName = workflowName;
+      this.workflowNodeName = workflowNodeName;
+      this.workflowAdjacencies = workflowAdjacencies;
+      assertBoolean = null;
+    }
+
+    @Override
+    public void handle(JobHistoryEvent jhEvent) {
+      if (jhEvent.getType() != EventType.JOB_SUBMITTED) {
+        return;
+      }
+      JobSubmittedEvent jsEvent = (JobSubmittedEvent) jhEvent.getHistoryEvent();
+      if (!workflowId.equals(jsEvent.getWorkflowId())) {
+        setAssertValue(false);
+        return;
+      }
+      if (!workflowName.equals(jsEvent.getWorkflowName())) {
+        setAssertValue(false);
+        return;
+      }
+      if (!workflowNodeName.equals(jsEvent.getWorkflowNodeName())) {
+        setAssertValue(false);
+        return;
+      }
+      if (!workflowAdjacencies.equals(jsEvent.getWorkflowAdjacencies())) {
+        setAssertValue(false);
+        return;
+      }
+      setAssertValue(true);
+    }
+    
+    private synchronized void setAssertValue(Boolean bool) {
+      assertBoolean = bool;
+      notify();
+    }
+    
+    public synchronized boolean getAssertValue() throws InterruptedException {
+      while (assertBoolean == null) {
+        wait();
+      }
+      return assertBoolean;
+    }
+
+  }
+
   private static class StubbedJob extends JobImpl {
     //override the init transition
     private final InitTransition initTransition;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Tue Jan 29 23:53:59 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Locality;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -157,6 +158,7 @@ public class TestTaskAttempt{
         createMapTaskAttemptImplForTest(eventHandler, splitInfo);
     TaskAttemptImpl spyTa = spy(mockTaskAttempt);
     when(spyTa.resolveHost(hosts[0])).thenReturn("host1");
+    spyTa.dataLocalHosts = spyTa.resolveHosts(splitInfo.getLocations());
 
     TaskAttemptEvent mockTAEvent = mock(TaskAttemptEvent.class);
     rct.transition(spyTa, mockTAEvent);
@@ -360,6 +362,8 @@ public class TestTaskAttempt{
     taImpl.handle(new TaskAttemptEvent(attemptId,
         TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
     assertFalse(eventHandler.internalError);
+    assertEquals("Task attempt is not assigned on the local node", 
+        Locality.NODE_LOCAL, taImpl.getLocality());
   }
 
   @Test
@@ -398,7 +402,7 @@ public class TestTaskAttempt{
           mock(Token.class), new Credentials(),
           new SystemClock(), appCtx);
 
-    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.2", 0);
     ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
@@ -416,6 +420,8 @@ public class TestTaskAttempt{
         TaskAttemptEventType.TA_CONTAINER_CLEANED));
     assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
         eventHandler.internalError);
+    assertEquals("Task attempt is not assigned on the local rack",
+        Locality.RACK_LOCAL, taImpl.getLocality());
   }
 
   @Test
@@ -439,7 +445,7 @@ public class TestTaskAttempt{
     jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
 
     TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
-    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+    when(splits.getLocations()).thenReturn(new String[] {});
 
     AppContext appCtx = mock(AppContext.class);
     ClusterInfo clusterInfo = mock(ClusterInfo.class);
@@ -475,6 +481,8 @@ public class TestTaskAttempt{
         TaskAttemptEventType.TA_CONTAINER_CLEANED));
     assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
         eventHandler.internalError);
+    assertEquals("Task attempt is assigned locally", Locality.OFF_SWITCH,
+        taImpl.getLocality());
   }
 
   @Test

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Jan 29 23:53:59 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.TaskUmbi
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Avataar;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -46,10 +47,12 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -254,6 +257,7 @@ public class TestTaskImpl {
     mockTask.handle(new TaskEvent(taskId, 
         TaskEventType.T_SCHEDULE));
     assertTaskScheduledState();
+    assertTaskAttemptAvataar(Avataar.VIRGIN);
   }
   
   private void killTask(TaskId taskId) {
@@ -338,6 +342,19 @@ public class TestTaskImpl {
   private void assertTaskSucceededState() {
     assertEquals(TaskState.SUCCEEDED, mockTask.getState());
   }
+
+  /**
+   * {@link Avataar}
+   */
+  private void assertTaskAttemptAvataar(Avataar avataar) {
+    for (TaskAttempt taskAttempt : mockTask.getAttempts().values()) {
+      if (((TaskAttemptImpl) taskAttempt).getAvataar() == avataar) {
+        return;
+      }
+    }
+    fail("There is no " + (avataar == Avataar.VIRGIN ? "virgin" : "speculative")
+        + "task attempt");
+  }
   
   @Test
   public void testInit() {
@@ -516,6 +533,9 @@ public class TestTaskImpl {
     
     // The task should still be in the succeeded state
     assertTaskSucceededState();
+    
+    // The task should contain speculative a task attempt
+    assertTaskAttemptAvataar(Avataar.SPECULATIVE);
   }
   
   @Test

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Tue Jan 29 23:53:59 2013
@@ -91,7 +91,11 @@
                                     "values": "string"
                                    }
           },
-          {"name": "jobQueueName", "type": "string"}
+          {"name": "jobQueueName", "type": "string"},
+          {"name": "workflowId", "type": "string"},
+          {"name": "workflowName", "type": "string"},
+          {"name": "workflowNodeName", "type": "string"},
+          {"name": "workflowAdjacencies", "type": "string"}
       ]
      },
 
@@ -191,7 +195,9 @@
           {"name": "trackerName", "type": "string"},
           {"name": "httpPort", "type": "int"},
           {"name": "shufflePort", "type": "int"},
-          {"name": "containerId", "type": "string"}
+          {"name": "containerId", "type": "string"},
+          {"name": "locality", "type": "string"},
+          {"name": "avataar", "type": "string"}
       ]
      },
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Jan 29 23:53:59 2013
@@ -647,5 +647,18 @@ public interface MRJobConfig {
       "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*",
       "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
   };
+
+  public static final String WORKFLOW_ID = "mapreduce.workflow.id";
+  
+  public static final String WORKFLOW_NAME = "mapreduce.workflow.name";
+  
+  public static final String WORKFLOW_NODE_NAME =
+      "mapreduce.workflow.node.name";
+  
+  public static final String WORKFLOW_ADJACENCY_PREFIX_STRING =
+      "mapreduce.workflow.adjacency.";
+  
+  public static final String WORKFLOW_ADJACENCY_PREFIX_PATTERN =
+      "^mapreduce\\.workflow\\.adjacency\\..+";
   
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java Tue Jan 29 23:53:59 2013
@@ -52,6 +52,29 @@ public class JobSubmittedEvent implement
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath,
       Map<JobACL, AccessControlList> jobACLs, String jobQueueName) {
+    this(id, jobName, userName, submitTime, jobConfPath, jobACLs,
+        jobQueueName, "", "", "", "");
+  }
+
+  /**
+   * Create an event to record job submission
+   * @param id The job Id of the job
+   * @param jobName Name of the job
+   * @param userName Name of the user who submitted the job
+   * @param submitTime Time of submission
+   * @param jobConfPath Path of the Job Configuration file
+   * @param jobACLs The configured acls for the job.
+   * @param jobQueueName The job-queue to which this job was submitted to
+   * @param workflowId The Id of the workflow
+   * @param workflowName The name of the workflow
+   * @param workflowNodeName The node name of the workflow
+   * @param workflowAdjacencies The adjacencies of the workflow
+   */
+  public JobSubmittedEvent(JobID id, String jobName, String userName,
+      long submitTime, String jobConfPath,
+      Map<JobACL, AccessControlList> jobACLs, String jobQueueName,
+      String workflowId, String workflowName, String workflowNodeName,
+      String workflowAdjacencies) {
     datum.jobid = new Utf8(id.toString());
     datum.jobName = new Utf8(jobName);
     datum.userName = new Utf8(userName);
@@ -66,6 +89,18 @@ public class JobSubmittedEvent implement
     if (jobQueueName != null) {
       datum.jobQueueName = new Utf8(jobQueueName);
     }
+    if (workflowId != null) {
+      datum.workflowId = new Utf8(workflowId);
+    }
+    if (workflowName != null) {
+      datum.workflowName = new Utf8(workflowName);
+    }
+    if (workflowNodeName != null) {
+      datum.workflowNodeName = new Utf8(workflowNodeName);
+    }
+    if (workflowAdjacencies != null) {
+      datum.workflowAdjacencies = new Utf8(workflowAdjacencies);
+    }
   }
 
   JobSubmittedEvent() {}
@@ -105,6 +140,34 @@ public class JobSubmittedEvent implement
     }
     return jobAcls;
   }
+  /** Get the id of the workflow */
+  public String getWorkflowId() {
+    if (datum.workflowId != null) {
+      return datum.workflowId.toString();
+    }
+    return null;
+  }
+  /** Get the name of the workflow */
+  public String getWorkflowName() {
+    if (datum.workflowName != null) {
+      return datum.workflowName.toString();
+    }
+    return null;
+  }
+  /** Get the node name of the workflow */
+  public String getWorkflowNodeName() {
+    if (datum.workflowNodeName != null) {
+      return datum.workflowNodeName.toString();
+    }
+    return null;
+  }
+  /** Get the adjacencies of the workflow */
+  public String getWorkflowAdjacencies() {
+    if (datum.workflowAdjacencies != null) {
+      return datum.workflowAdjacencies.toString();
+    }
+    return null;
+  }
   /** Get the event type */
   public EventType getEventType() { return EventType.JOB_SUBMITTED; }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java Tue Jan 29 23:53:59 2013
@@ -46,10 +46,13 @@ public class TaskAttemptStartedEvent imp
    * @param httpPort The port number of the tracker
    * @param shufflePort The shuffle port number of the container
    * @param containerId The containerId for the task attempt.
+   * @param locality The locality of the task attempt
+   * @param avataar The avataar of the task attempt
    */
   public TaskAttemptStartedEvent( TaskAttemptID attemptId,  
       TaskType taskType, long startTime, String trackerName,
-      int httpPort, int shufflePort, ContainerId containerId) {
+      int httpPort, int shufflePort, ContainerId containerId,
+      String locality, String avataar) {
     datum.attemptId = new Utf8(attemptId.toString());
     datum.taskid = new Utf8(attemptId.getTaskID().toString());
     datum.startTime = startTime;
@@ -58,14 +61,21 @@ public class TaskAttemptStartedEvent imp
     datum.httpPort = httpPort;
     datum.shufflePort = shufflePort;
     datum.containerId = new Utf8(containerId.toString());
+    if (locality != null) {
+      datum.locality = new Utf8(locality);
+    }
+    if (avataar != null) {
+      datum.avataar = new Utf8(avataar);
+    }
   }
 
   // TODO Remove after MrV1 is removed.
   // Using a dummy containerId to prevent jobHistory parse failures.
   public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType,
-      long startTime, String trackerName, int httpPort, int shufflePort) {
+      long startTime, String trackerName, int httpPort, int shufflePort,
+      String locality, String avataar) {
     this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort,
-        ConverterUtils.toContainerId("container_-1_-1_-1_-1"));
+        ConverterUtils.toContainerId("container_-1_-1_-1_-1"), locality, avataar);
   }
 
   TaskAttemptStartedEvent() {}
@@ -105,4 +115,19 @@ public class TaskAttemptStartedEvent imp
   public ContainerId getContainerId() {
     return ConverterUtils.toContainerId(datum.containerId.toString());
   }
+  /** Get the locality */
+  public String getLocality() {
+    if (datum.locality != null) {
+      return datum.locality.toString();
+    }
+    return null;
+  }
+  /** Get the avataar */
+  public String getAvataar() {
+    if (datum.avataar != null) {
+      return datum.avataar.toString();
+    }
+    return null;
+  }
+
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1440222&r1=1440221&r2=1440222&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Tue Jan 29 23:53:59 2013
@@ -522,7 +522,7 @@ public class MergeManagerImpl<K, V> impl
       
       // 1. Prepare the list of files to be merged. 
       for (CompressAwarePath file : inputs) {
-        approxOutputSize += localFS.getFileStatus(file.getPath()).getLen();
+        approxOutputSize += localFS.getFileStatus(file).getLen();
       }
 
       // add the checksum length
@@ -753,12 +753,12 @@ public class MergeManagerImpl<K, V> impl
     CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
         new CompressAwarePath[onDiskMapOutputs.size()]);
     for (CompressAwarePath file : onDisk) {
-      long fileLength = fs.getFileStatus(file.getPath()).getLen();
+      long fileLength = fs.getFileStatus(file).getLen();
       onDiskBytes += fileLength;
       rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;
 
       LOG.debug("Disk file: " + file + " Length is " + fileLength);
-      diskSegments.add(new Segment<K, V>(job, fs, file.getPath(), codec, keepInputs,
+      diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
                                          (file.toString().endsWith(
                                              Task.MERGED_OUTPUT_PREFIX) ?
                                           null : mergedMapOutputsCounter), file.getRawDataLength()
@@ -806,23 +806,26 @@ public class MergeManagerImpl<K, V> impl
   
   }
 
-  static class CompressAwarePath
-  {
+  static class CompressAwarePath extends Path {
     private long rawDataLength;
 
-    private Path path;
-
     public CompressAwarePath(Path path, long rawDataLength) {
-      this.path = path;
+      super(path.toUri());
       this.rawDataLength = rawDataLength;
     }
 
     public long getRawDataLength() {
       return rawDataLength;
     }
-
-    public Path getPath() {
-      return path;
+    
+    @Override
+    public boolean equals(Object other) {
+      return super.equals(other);
+    }
+    
+    @Override
+    public int hashCode() {
+      return super.hashCode();
     }
   }
 }

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1438306-1440221