You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:28:42 UTC

svn commit: r1399950 [3/11] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduc...

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Fri Oct 19 02:25:55 2012
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -36,23 +35,17 @@ import java.util.Map;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapTaskAttemptImpl;
-import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -79,18 +72,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
 import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
@@ -101,82 +90,6 @@ import org.mockito.ArgumentCaptor;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class TestTaskAttempt{
-  @Test
-  public void testAttemptContainerRequest() throws Exception {
-    //WARNING: This test must run first.  This is because there is an 
-    // optimization where the credentials passed in are cached statically so 
-    // they do not need to be recomputed when creating a new 
-    // ContainerLaunchContext. if other tests run first this code will cache
-    // their credentials and this test will fail trying to look for the
-    // credentials it inserted in.
-    final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
-    final byte[] SECRET_KEY = ("secretkey").getBytes();
-    Map<ApplicationAccessType, String> acls =
-        new HashMap<ApplicationAccessType, String>(1);
-    acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
-    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
-    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
-    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
-    Path jobFile = mock(Path.class);
-
-    EventHandler eventHandler = mock(EventHandler.class);
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
-    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-
-    JobConf jobConf = new JobConf();
-    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
-    jobConf.setBoolean("fs.file.impl.disable.cache", true);
-    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-
-    // setup UGI for security so tokens and keys are preserved
-    jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-    UserGroupInformation.setConfiguration(jobConf);
-
-    Credentials credentials = new Credentials();
-    credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
-    Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
-        ("tokenid").getBytes(), ("tokenpw").getBytes(),
-        new Text("tokenkind"), new Text("tokenservice"));
-    
-    TaskAttemptImpl taImpl =
-        new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
-            mock(TaskSplitMetaInfo.class), jobConf, taListener,
-            mock(OutputCommitter.class), jobToken, credentials,
-            new SystemClock(), null);
-
-    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
-    ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
-    
-    ContainerLaunchContext launchCtx =
-        TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
-            jobConf, jobToken, taImpl.createRemoteTask(),
-            TypeConverter.fromYarn(jobId), mock(Resource.class),
-            mock(WrappedJvmID.class), taListener,
-            credentials);
-
-    Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
-    Credentials launchCredentials = new Credentials();
-
-    DataInputByteBuffer dibb = new DataInputByteBuffer();
-    dibb.reset(launchCtx.getContainerTokens());
-    launchCredentials.readTokenStorageStream(dibb);
-
-    // verify all tokens specified for the task attempt are in the launch context
-    for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
-      Token<? extends TokenIdentifier> launchToken =
-          launchCredentials.getToken(token.getService());
-      Assert.assertNotNull("Token " + token.getService() + " is missing",
-          launchToken);
-      Assert.assertEquals("Token " + token.getService() + " mismatch",
-          token, launchToken);
-    }
-
-    // verify the secret key is in the launch context
-    Assert.assertNotNull("Secret key missing",
-        launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
-    Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY,
-        launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
-  }
 
   static public class StubbedFS extends RawLocalFileSystem {
     @Override
@@ -227,7 +140,7 @@ public class TestTaskAttempt{
     //Only a single occurrence of /DefaultRack
     assertEquals(1, requestedRacks.length);
   }
- 
+
   @Test
   public void testHostResolveAttempt() throws Exception {
     TaskAttemptImpl.RequestContainerTransition rct =
@@ -266,7 +179,7 @@ public class TestTaskAttempt{
     }
     assertEquals(0, expected.size());
   }
-  
+
   @Test
   public void testSlotMillisCounterUpdate() throws Exception {
     verifySlotMillis(2048, 2048, 1024);
@@ -325,13 +238,13 @@ public class TestTaskAttempt{
             .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
             .getValue());
   }
-  
+
   private TaskAttemptImpl createMapTaskAttemptImplForTest(
       EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
     Clock clock = new SystemClock();
     return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
   }
-  
+
   private TaskAttemptImpl createMapTaskAttemptImplForTest(
       EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
     ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
@@ -402,30 +315,30 @@ public class TestTaskAttempt{
       };
     }
   }
-  
+
   @Test
   public void testLaunchFailedWhileKilling() throws Exception {
     ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
-    ApplicationAttemptId appAttemptId = 
+    ApplicationAttemptId appAttemptId =
       BuilderUtils.newApplicationAttemptId(appId, 0);
     JobId jobId = MRBuilderUtils.newJobId(appId, 1);
     TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
     TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
     Path jobFile = mock(Path.class);
-    
+
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
     when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-    
+
     JobConf jobConf = new JobConf();
     jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     jobConf.setBoolean("fs.file.impl.disable.cache", true);
     jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
     jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-    
+
     TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
     when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-    
+
     TaskAttemptImpl taImpl =
       new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
           splits, jobConf, taListener,
@@ -437,7 +350,7 @@ public class TestTaskAttempt{
     Container container = mock(Container.class);
     when(container.getId()).thenReturn(contId);
     when(container.getNodeId()).thenReturn(nid);
-    
+
     taImpl.handle(new TaskAttemptEvent(attemptId,
         TaskAttemptEventType.TA_SCHEDULE));
     taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
@@ -450,7 +363,7 @@ public class TestTaskAttempt{
         TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
     assertFalse(eventHandler.internalError);
   }
-  
+
   @Test
   public void testContainerCleanedWhileRunning() throws Exception {
     ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
@@ -566,9 +479,76 @@ public class TestTaskAttempt{
         eventHandler.internalError);
   }
 
+  @Test
+  public void testDoubleTooManyFetchFailure() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId =
+      BuilderUtils.newApplicationAttemptId(appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl =
+      new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+          splits, jobConf, taListener,
+          mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+          new SystemClock(), appCtx);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+        container, mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_DONE));
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+
+    assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+    assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+    assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+        eventHandler.internalError);
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
-    
+
     @Override
     public void handle(Event event) {
       if (event instanceof JobEvent) {
@@ -578,6 +558,6 @@ public class TestTaskAttempt{
         }
       }
     }
-    
+
   };
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Oct 19 02:25:55 2012
@@ -84,7 +84,6 @@ public class TestTaskImpl {
   private ApplicationId appId;
   private TaskSplitMetaInfo taskSplitMetaInfo;  
   private String[] dataLocations = new String[0]; 
-  private final TaskType taskType = TaskType.MAP;
   private AppContext appContext;
   
   private int startCount = 0;
@@ -97,6 +96,7 @@ public class TestTaskImpl {
   private class MockTaskImpl extends TaskImpl {
         
     private int taskAttemptCounter = 0;
+    TaskType taskType;
 
     public MockTaskImpl(JobId jobId, int partition,
         EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
@@ -104,11 +104,12 @@ public class TestTaskImpl {
         Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
         Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
-        MRAppMetrics metrics, AppContext appContext) {
+        MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
       super(jobId, taskType , partition, eventHandler,
           remoteJobConfFile, conf, taskAttemptListener, committer, 
           jobToken, credentials, clock,
           completedTasksFromPreviousRun, startCount, metrics, appContext);
+      this.taskType = taskType;
     }
 
     @Override
@@ -120,7 +121,7 @@ public class TestTaskImpl {
     protected TaskAttemptImpl createAttempt() {
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter, 
           eventHandler, taskAttemptListener, remoteJobConfFile, partition,
-          conf, committer, jobToken, credentials, clock, appContext);
+          conf, committer, jobToken, credentials, clock, appContext, taskType);
       taskAttempts.add(attempt);
       return attempt;
     }
@@ -142,18 +143,20 @@ public class TestTaskImpl {
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
     private TaskAttemptId attemptId;
+    private TaskType taskType;
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
         JobConf conf, OutputCommitter committer,
         Token<JobTokenIdentifier> jobToken,
         Credentials credentials, Clock clock,
-        AppContext appContext) {
+        AppContext appContext, TaskType taskType) {
       super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
           dataLocations, committer, jobToken, credentials, clock, appContext);
       attemptId = Records.newRecord(TaskAttemptId.class);
       attemptId.setId(id);
       attemptId.setTaskId(taskId);
+      this.taskType = taskType;
     }
 
     public TaskAttemptId getAttemptId() {
@@ -162,7 +165,7 @@ public class TestTaskImpl {
     
     @Override
     protected Task createRemoteTask() {
-      return new MockTask();
+      return new MockTask(taskType);
     }    
     
     public float getProgress() {
@@ -185,6 +188,11 @@ public class TestTaskImpl {
   
   private class MockTask extends Task {
 
+    private TaskType taskType;
+    MockTask(TaskType taskType) {
+      this.taskType = taskType;
+    }
+    
     @Override
     public void run(JobConf job, TaskUmbilicalProtocol umbilical)
         throws IOException, ClassNotFoundException, InterruptedException {
@@ -193,7 +201,7 @@ public class TestTaskImpl {
 
     @Override
     public boolean isMapTask() {
-      return true;
+      return (taskType == TaskType.MAP);
     }    
     
   }
@@ -227,14 +235,15 @@ public class TestTaskImpl {
     taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
     when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); 
     
-    taskAttempts = new ArrayList<MockTaskAttemptImpl>();
-    
-    mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+    taskAttempts = new ArrayList<MockTaskAttemptImpl>();    
+  }
+  
+  private MockTaskImpl createMockTask(TaskType taskType) {
+    return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
         remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
         credentials, clock,
         completedTasksFromPreviousRun, startCount,
-        metrics, appContext);        
-    
+        metrics, appContext, taskType);
   }
 
   @After 
@@ -342,6 +351,7 @@ public class TestTaskImpl {
   @Test
   public void testInit() {
     LOG.info("--- START: testInit ---");
+    mockTask = createMockTask(TaskType.MAP);        
     assertTaskNewState();
     assert(taskAttempts.size() == 0);
   }
@@ -352,6 +362,7 @@ public class TestTaskImpl {
    */
   public void testScheduleTask() {
     LOG.info("--- START: testScheduleTask ---");
+    mockTask = createMockTask(TaskType.MAP);        
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
   }
@@ -362,6 +373,7 @@ public class TestTaskImpl {
    */
   public void testKillScheduledTask() {
     LOG.info("--- START: testKillScheduledTask ---");
+    mockTask = createMockTask(TaskType.MAP);        
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     killTask(taskId);
@@ -374,6 +386,7 @@ public class TestTaskImpl {
    */
   public void testKillScheduledTaskAttempt() {
     LOG.info("--- START: testKillScheduledTaskAttempt ---");
+    mockTask = createMockTask(TaskType.MAP);        
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     killScheduledTaskAttempt(getLastAttempt().getAttemptId());
@@ -386,6 +399,7 @@ public class TestTaskImpl {
    */
   public void testLaunchTaskAttempt() {
     LOG.info("--- START: testLaunchTaskAttempt ---");
+    mockTask = createMockTask(TaskType.MAP);        
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -398,6 +412,7 @@ public class TestTaskImpl {
    */
   public void testKillRunningTaskAttempt() {
     LOG.info("--- START: testKillRunningTaskAttempt ---");
+    mockTask = createMockTask(TaskType.MAP);        
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -407,6 +422,7 @@ public class TestTaskImpl {
   @Test 
   public void testTaskProgress() {
     LOG.info("--- START: testTaskProgress ---");
+    mockTask = createMockTask(TaskType.MAP);        
         
     // launch task
     TaskId taskId = getNewTaskID();
@@ -444,6 +460,7 @@ public class TestTaskImpl {
   
   @Test
   public void testFailureDuringTaskAttemptCommit() {
+    mockTask = createMockTask(TaskType.MAP);        
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -469,8 +486,7 @@ public class TestTaskImpl {
     assertTaskSucceededState();
   }
   
-  @Test
-  public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+  private void runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType failEvent) {
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -489,11 +505,34 @@ public class TestTaskImpl {
     
     // Now fail the first task attempt, after the second has succeeded
     mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), 
-        TaskEventType.T_ATTEMPT_FAILED));
+        failEvent));
     
     // The task should still be in the succeeded state
     assertTaskSucceededState();
-    
+  }
+  
+  @Test
+  public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+    mockTask = createMockTask(TaskType.MAP);        
+    runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
+  }
+
+  @Test
+  public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+    mockTask = createMockTask(TaskType.REDUCE);        
+    runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
+  }
+  
+  @Test
+  public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
+    mockTask = createMockTask(TaskType.MAP);        
+    runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
+  }
+
+  @Test
+  public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
+    mockTask = createMockTask(TaskType.REDUCE);        
+    runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
   }
 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java Fri Oct 19 02:25:55 2012
@@ -972,7 +972,8 @@ public class TestAMWebServicesJobs exten
         WebServicesTestUtils.checkStringMatch("containerId", amInfo
             .getContainerId().toString(), containerId);
 
-        String localLogsLink = ujoin("node", "containerlogs", containerId);
+        String localLogsLink =ujoin("node", "containerlogs", containerId,
+            job.getUserName());
 
         assertTrue("logsLink", logsLink.contains(localLogsLink));
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml Fri Oct 19 02:25:55 2012
@@ -39,6 +39,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
     </dependency>
     <dependency>
@@ -77,7 +81,7 @@
             <configuration>
               <executable>protoc</executable>
               <arguments>
-                <argument>-I../../hadoop-yarn/hadoop-yarn-api/src/main/proto/</argument>
+                <argument>-I../../../hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/</argument>
                 <argument>-Isrc/main/proto/</argument>
                 <argument>--java_out=target/generated-sources/proto</argument>
                 <argument>src/main/proto/mr_protos.proto</argument>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Fri Oct 19 02:25:55 2012
@@ -18,12 +18,9 @@
 
 package org.apache.hadoop.mapred;
 
-import com.google.common.collect.Maps;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
-import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
@@ -34,6 +31,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.FSDownload;
 
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -85,6 +84,8 @@ class LocalDistributedCacheManager {
    * @throws IOException
    */
   public void setup(JobConf conf) throws IOException {
+    File workDir = new File(System.getProperty("user.dir"));
+    
     // Generate YARN local resources objects corresponding to the distributed
     // cache configuration
     Map<String, LocalResource> localResources = 
@@ -132,7 +133,8 @@ class LocalDistributedCacheManager {
         Future<Path> future = exec.submit(download);
         resourcesToPaths.put(resource, future);
       }
-      for (LocalResource resource : localResources.values()) {
+      for (Entry<String, LocalResource> entry : localResources.entrySet()) {
+        LocalResource resource = entry.getValue();
         Path path;
         try {
           path = resourcesToPaths.get(resource).get();
@@ -142,10 +144,18 @@ class LocalDistributedCacheManager {
           throw new IOException(e);
         }
         String pathString = path.toUri().toString();
+        String link = entry.getKey();
+        String target = new File(path.toUri()).getPath();
+        symlink(workDir, target, link);
+        
         if (resource.getType() == LocalResourceType.ARCHIVE) {
           localArchives.add(pathString);
         } else if (resource.getType() == LocalResourceType.FILE) {
           localFiles.add(pathString);
+        } else if (resource.getType() == LocalResourceType.PATTERN) {
+          //PATTERN is not currently used in local mode
+          throw new IllegalArgumentException("Resource type PATTERN is not " +
+          		"implemented yet. " + resource.getResource());
         }
         Path resourcePath;
         try {
@@ -175,27 +185,6 @@ class LocalDistributedCacheManager {
           .arrayToString(localFiles.toArray(new String[localArchives
               .size()])));
     }
-    if (DistributedCache.getSymlink(conf)) {
-      File workDir = new File(System.getProperty("user.dir"));
-      URI[] archives = DistributedCache.getCacheArchives(conf);
-      URI[] files = DistributedCache.getCacheFiles(conf);
-      Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
-      Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
-      if (archives != null) {
-        for (int i = 0; i < archives.length; i++) {
-          String link = archives[i].getFragment();
-          String target = new File(localArchives[i].toUri()).getPath();
-          symlink(workDir, target, link);
-        }
-      }
-      if (files != null) {
-        for (int i = 0; i < files.length; i++) {
-          String link = files[i].getFragment();
-          String target = new File(localFiles[i].toUri()).getPath();
-          symlink(workDir, target, link);
-        }
-      }
-    }
     setupCalled = true;
   }
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Fri Oct 19 02:25:55 2012
@@ -383,6 +383,7 @@ public class TypeConverter {
     switch (yarnApplicationState) {
     case NEW:
     case SUBMITTED:
+    case ACCEPTED:
       return State.PREP;
     case RUNNING:
       return State.RUNNING;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Fri Oct 19 02:25:55 2012
@@ -61,11 +61,6 @@ public class JHAdminConfig {
     MR_HISTORY_PREFIX + "datestring.cache.size";
   public static final int DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE = 200000;
   
-  //TODO REMOVE debug-mode
-  /** Equivalent to 0.20 mapreduce.jobhistory.debug.mode */
-  public static final String MR_HISTORY_DEBUG_MODE = 
-    MR_HISTORY_PREFIX + "debug-mode";
-  
   /** Path where history files should be stored for DONE jobs. **/
   public static final String MR_HISTORY_DONE_DIR =
     MR_HISTORY_PREFIX + "done-dir";
@@ -133,5 +128,16 @@ public class JHAdminConfig {
    * The HistoryStorage class to use to cache history data.
    */
   public static final String MR_HISTORY_STORAGE =
-    MR_HISTORY_PREFIX + ".store.class";
+    MR_HISTORY_PREFIX + "store.class";
+
+  /** Whether to use fixed ports with the minicluster. */
+  public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX
+       + "minicluster.fixed.ports";
+
+  /**
+   * Default is false to be able to run tests concurrently without port
+   * conflicts.
+   */
+  public static boolean DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS = false;
+
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Fri Oct 19 02:25:55 2012
@@ -79,6 +79,13 @@ public class JobHistoryUtils {
 
   public static final FsPermission HISTORY_DONE_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0770); // rwx------
+
+ /**
+   * Umask for the done dir and derivatives.
+   */
+  public static final FsPermission HISTORY_DONE_DIR_UMASK = FsPermission
+      .createImmutable((short) (0770 ^ 0777));
+
   
   /**
    * Permissions for the intermediate done directory.
@@ -336,20 +343,19 @@ public class JobHistoryUtils {
   /**
    * Gets the timestamp component based on millisecond time.
    * @param millisecondTime
-   * @param debugMode
    * @return the timestamp component based on millisecond time
    */
-  public static String timestampDirectoryComponent(long millisecondTime, boolean debugMode) {
+  public static String timestampDirectoryComponent(long millisecondTime) {
     Calendar timestamp = Calendar.getInstance();
     timestamp.setTimeInMillis(millisecondTime);
     String dateString = null;
-    dateString = String.format(
-        TIMESTAMP_DIR_FORMAT,
-        timestamp.get(Calendar.YEAR),
-        // months are 0-based in Calendar, but people will expect January
-        // to be month #1.
-        timestamp.get(debugMode ? Calendar.HOUR : Calendar.MONTH) + 1,
-        timestamp.get(debugMode ? Calendar.MINUTE : Calendar.DAY_OF_MONTH));
+    dateString = String
+        .format(TIMESTAMP_DIR_FORMAT,
+            timestamp.get(Calendar.YEAR),
+            // months are 0-based in Calendar, but people will expect January to
+            // be month #1.
+            timestamp.get(Calendar.MONTH) + 1,
+            timestamp.get(Calendar.DAY_OF_MONTH));
     dateString = dateString.intern();
     return dateString;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Fri Oct 19 02:25:55 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 /**
  * Helper class for MR applications
@@ -171,8 +173,15 @@ public class MRApps extends Apps {
       }
 
       // Add standard Hadoop classes
-      for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)
-          .split(",")) {
+      for (String c : conf.getStrings(
+          YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+          YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+        Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
+            .trim());
+      }
+      for (String c : conf.getStrings(
+          MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+          MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
         Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
             .trim());
       }
@@ -201,7 +210,15 @@ public class MRApps extends Apps {
     Apps.addToEnvironment(
         environment,
         Environment.CLASSPATH.name(),
-        MRJobConfig.JOB_JAR);
+        MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR);
+    Apps.addToEnvironment(
+        environment,
+        Environment.CLASSPATH.name(),
+        MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR);
+    Apps.addToEnvironment(
+        environment,
+        Environment.CLASSPATH.name(),
+        MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*");
     Apps.addToEnvironment(
         environment,
         Environment.CLASSPATH.name(),
@@ -263,6 +280,13 @@ public class MRApps extends Apps {
         DistributedCache.getFileClassPaths(conf));
   }
 
+  private static String getResourceDescription(LocalResourceType type) {
+    if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) {
+      return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
+    }
+    return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
+  }
+  
   // TODO - Move this to MR!
   // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
   // long[], boolean[], Path[], FileType)
@@ -308,6 +332,13 @@ public class MRApps extends Apps {
           throw new IllegalArgumentException("Resource name must be relative");
         }
         String linkName = name.toUri().getPath();
+        LocalResource orig = localResources.get(linkName);
+        if(orig != null && !orig.getResource().equals(
+            ConverterUtils.getYarnUrlFromURI(p.toUri()))) {
+          throw new InvalidJobConfException(
+              getResourceDescription(orig.getType()) + orig.getResource() + 
+              " conflicts with " + getResourceDescription(type) + u);
+        }
         localResources.put(
             linkName,
             BuilderUtils.newLocalResource(

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Fri Oct 19 02:25:55 2012
@@ -67,7 +67,7 @@ public class MRBuilderUtils {
       String userName, JobState state, long submitTime, long startTime, long finishTime,
       float setupProgress, float mapProgress, float reduceProgress,
       float cleanupProgress, String jobFile, List<AMInfo> amInfos,
-      boolean isUber) {
+      boolean isUber, String diagnostics) {
     JobReport report = Records.newRecord(JobReport.class);
     report.setJobId(jobId);
     report.setJobName(jobName);
@@ -83,6 +83,7 @@ public class MRBuilderUtils {
     report.setJobFile(jobFile);
     report.setAMInfos(amInfos);
     report.setIsUber(isUber);
+    report.setDiagnostics(diagnostics);
     return report;
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Fri Oct 19 02:25:55 2012
@@ -117,7 +117,8 @@ public class TestMRWithDistributedCache 
       TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1,
           symlinkFile.length());
       
-      TestCase.assertFalse("second file should not be symlinked",
+      //This last one is a difference between MRv2 and MRv1
+      TestCase.assertTrue("second file should be symlinked too",
           expectedAbsentSymlinkFile.exists());
     }
   }
@@ -145,7 +146,6 @@ public class TestMRWithDistributedCache 
     job.addFileToClassPath(second);
     job.addArchiveToClassPath(third);
     job.addCacheArchive(fourth.toUri());
-    job.createSymlink();
     job.setMaxMapAttempts(1); // speed up failures
 
     job.submit();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Fri Oct 19 02:25:55 2012
@@ -19,25 +19,33 @@
 package org.apache.hadoop.mapreduce.v2.util;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InvalidJobConfException;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 import org.junit.Test;
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 public class TestMRApps {
 
@@ -132,11 +140,19 @@ public class TestMRApps {
     Map<String, String> environment = new HashMap<String, String>();
     MRApps.setClasspath(environment, job.getConfiguration());
     assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
-    String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
-    if (confClasspath != null) {
-      confClasspath = confClasspath.replaceAll(",\\s*", ":").trim();
+    String yarnAppClasspath = 
+        job.getConfiguration().get(
+            YarnConfiguration.YARN_APPLICATION_CLASSPATH);
+    if (yarnAppClasspath != null) {
+      yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", ":").trim();
     }
-    assertTrue(environment.get("CLASSPATH").contains(confClasspath));
+    assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath));
+    String mrAppClasspath = 
+        job.getConfiguration().get(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH);
+    if (mrAppClasspath != null) {
+      mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", ":").trim();
+    }
+    assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
   }
 
  @Test public void testSetClasspathWithUserPrecendence() {
@@ -150,7 +166,7 @@ public class TestMRApps {
     }
     String env_str = env.get("CLASSPATH");
     assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
-      env_str.indexOf("$PWD:job.jar"), 0);
+      env_str.indexOf("$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0);
   }
 
   @Test public void testSetClasspathWithNoUserPrecendence() {
@@ -163,8 +179,129 @@ public class TestMRApps {
       fail("Got exception while setting classpath");
     }
     String env_str = env.get("CLASSPATH");
+    int index = 
+         env_str.indexOf("job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*");
+    assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not"
+            + " in the classpath!", index, -1);
     assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
-      env_str.indexOf("$PWD:job.jar"), 0);
+      index, 0);
   }
-
+  
+  @Test
+  public void testSetupDistributedCacheEmpty() throws IOException {
+    Configuration conf = new Configuration();
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+    assertTrue("Empty Config did not produce an empty list of resources",
+        localResources.isEmpty());
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test(expected = InvalidJobConfException.class)
+  public void testSetupDistributedCacheConflicts() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    
+    URI mockUri = URI.create("mockfs://mock/");
+    FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
+        .getRawFileSystem();
+    
+    URI archive = new URI("mockfs://mock/tmp/something.zip#something");
+    Path archivePath = new Path(archive);
+    URI file = new URI("mockfs://mock/tmp/something.txt#something");
+    Path filePath = new Path(file);
+    
+    when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
+    when(mockFs.resolvePath(filePath)).thenReturn(filePath);
+    
+    DistributedCache.addCacheArchive(archive, conf);
+    conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
+    conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
+    conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
+    DistributedCache.addCacheFile(file, conf);
+    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
+    conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
+    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
+    Map<String, LocalResource> localResources = 
+      new HashMap<String, LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test(expected = InvalidJobConfException.class)
+  public void testSetupDistributedCacheConflictsFiles() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    
+    URI mockUri = URI.create("mockfs://mock/");
+    FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
+        .getRawFileSystem();
+    
+    URI file = new URI("mockfs://mock/tmp/something.zip#something");
+    Path filePath = new Path(file);
+    URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
+    Path file2Path = new Path(file);
+    
+    when(mockFs.resolvePath(filePath)).thenReturn(filePath);
+    when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
+    
+    DistributedCache.addCacheFile(file, conf);
+    DistributedCache.addCacheFile(file2, conf);
+    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
+    conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
+    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
+    Map<String, LocalResource> localResources = 
+      new HashMap<String, LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+  }
+  
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testSetupDistributedCache() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    
+    URI mockUri = URI.create("mockfs://mock/");
+    FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
+        .getRawFileSystem();
+    
+    URI archive = new URI("mockfs://mock/tmp/something.zip");
+    Path archivePath = new Path(archive);
+    URI file = new URI("mockfs://mock/tmp/something.txt#something");
+    Path filePath = new Path(file);
+    
+    when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
+    when(mockFs.resolvePath(filePath)).thenReturn(filePath);
+    
+    DistributedCache.addCacheArchive(archive, conf);
+    conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
+    conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
+    conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
+    DistributedCache.addCacheFile(file, conf);
+    conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
+    conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
+    conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
+    Map<String, LocalResource> localResources = 
+      new HashMap<String, LocalResource>();
+    MRApps.setupDistributedCache(conf, localResources);
+    assertEquals(2, localResources.size());
+    LocalResource lr = localResources.get("something.zip");
+    assertNotNull(lr);
+    assertEquals(10l, lr.getSize());
+    assertEquals(10l, lr.getTimestamp());
+    assertEquals(LocalResourceType.ARCHIVE, lr.getType());
+    lr = localResources.get("something");
+    assertNotNull(lr);
+    assertEquals(11l, lr.getSize());
+    assertEquals(11l, lr.getTimestamp());
+    assertEquals(LocalResourceType.FILE, lr.getType());
+  }
+  
+  static class MockFileSystem extends FilterFileSystem {
+    MockFileSystem() {
+      super(mock(FileSystem.class));
+    }
+    public void initialize(URI name, Configuration conf) throws IOException {}
+  }
+  
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Fri Oct 19 02:25:55 2012
@@ -68,6 +68,24 @@
         </executions>
       </plugin>
       <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/avro</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>
         <executions>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java Fri Oct 19 02:25:55 2012
@@ -48,8 +48,12 @@ import org.apache.hadoop.mapreduce.Job;
  * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. 
  * Jars may be optionally added to the classpath of the tasks, a rudimentary 
  * software distribution mechanism.  Files have execution permissions.
- * Optionally users can also direct it to symlink the distributed cache file(s)
- * into the working directory of the task.</p>
+ * In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ * to be created in the working directory of the child task.  In the current 
+ * version symlinks are always created.  If the URL does not have a fragment 
+ * the name of the file or directory will be used. If multiple files or 
+ * directories map to the same link name, the last one added, will be used.  All
+ * others will not even be downloaded.</p>
  * 
  * <p><code>DistributedCache</code> tracks modification timestamps of the cache 
  * files. Clearly the cache files should not be modified by the application 
@@ -91,8 +95,7 @@ import org.apache.hadoop.mapreduce.Job;
  *       
  *       public void configure(JobConf job) {
  *         // Get the cached archives/files
- *         localArchives = DistributedCache.getLocalCacheArchives(job);
- *         localFiles = DistributedCache.getLocalCacheFiles(job);
+ *         File f = new File("./map.zip/some/file/in/zip.txt");
  *       }
  *       
  *       public void map(K key, V value, 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java Fri Oct 19 02:25:55 2012
@@ -340,7 +340,7 @@ public class IFile {
                   CompressionCodec codec,
                   Counters.Counter readsCounter) throws IOException {
       readRecordsCounter = readsCounter;
-      checksumIn = new IFileInputStream(in,length);
+      checksumIn = new IFileInputStream(in,length, conf);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         if (decompressor != null) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java Fri Oct 19 02:25:55 2012
@@ -19,13 +19,22 @@
 package org.apache.hadoop.mapred;
 
 import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.HasFileDescriptor;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.util.DataChecksum;
 /**
  * A checksum input stream, used for IFiles.
@@ -35,7 +44,8 @@ import org.apache.hadoop.util.DataChecks
 @InterfaceStability.Unstable
 public class IFileInputStream extends InputStream {
   
-  private final InputStream in; //The input stream to be verified for checksum. 
+  private final InputStream in; //The input stream to be verified for checksum.
+  private final FileDescriptor inFd; // the file descriptor, if it is known
   private final long length; //The total length of the input file
   private final long dataLength;
   private DataChecksum sum;
@@ -43,7 +53,14 @@ public class IFileInputStream extends In
   private final byte b[] = new byte[1];
   private byte csum[] = null;
   private int checksumSize;
-  
+
+  private ReadaheadRequest curReadahead = null;
+  private ReadaheadPool raPool = ReadaheadPool.getInstance();
+  private boolean readahead;
+  private int readaheadLength;
+
+  public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
   private boolean disableChecksumValidation = false;
   
   /**
@@ -51,13 +68,36 @@ public class IFileInputStream extends In
    * @param in The input stream to be verified for checksum.
    * @param len The length of the input stream including checksum bytes.
    */
-  public IFileInputStream(InputStream in, long len) {
+  public IFileInputStream(InputStream in, long len, Configuration conf) {
     this.in = in;
-    sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
+    this.inFd = getFileDescriptorIfAvail(in);
+    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 
         Integer.MAX_VALUE);
     checksumSize = sum.getChecksumSize();
     length = len;
     dataLength = length - checksumSize;
+
+    conf = (conf != null) ? conf : new Configuration();
+    readahead = conf.getBoolean(MRConfig.MAPRED_IFILE_READAHEAD,
+        MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD);
+    readaheadLength = conf.getInt(MRConfig.MAPRED_IFILE_READAHEAD_BYTES,
+        MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
+
+    doReadahead();
+  }
+
+  private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+    FileDescriptor fd = null;
+    try {
+      if (in instanceof HasFileDescriptor) {
+        fd = ((HasFileDescriptor)in).getFileDescriptor();
+      } else if (in instanceof FileInputStream) {
+        fd = ((FileInputStream)in).getFD();
+      }
+    } catch (IOException e) {
+      LOG.info("Unable to determine FileDescriptor", e);
+    }
+    return fd;
   }
 
   /**
@@ -66,6 +106,10 @@ public class IFileInputStream extends In
    */
   @Override
   public void close() throws IOException {
+
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
     if (currentOffset < dataLength) {
       byte[] t = new byte[Math.min((int)
             (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@@ -102,10 +146,21 @@ public class IFileInputStream extends In
     if (currentOffset >= dataLength) {
       return -1;
     }
-    
+
+    doReadahead();
+
     return doRead(b,off,len);
   }
 
+  private void doReadahead() {
+    if (raPool != null && inFd != null && readahead) {
+      curReadahead = raPool.readaheadStream(
+          "ifile", inFd,
+          currentOffset, readaheadLength, dataLength,
+          curReadahead);
+    }
+  }
+
   /**
    * Read bytes from the stream.
    * At EOF, checksum is validated and sent back

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileOutputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileOutputStream.java Fri Oct 19 02:25:55 2012
@@ -49,7 +49,7 @@ public class IFileOutputStream extends F
    */
   public IFileOutputStream(OutputStream out) {
     super(out);
-    sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
+    sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
         Integer.MAX_VALUE);
     barray = new byte[sum.getChecksumSize()];
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java Fri Oct 19 02:25:55 2012
@@ -67,13 +67,13 @@ class IndexCache {
     if (info == null) {
       info = readIndexFileToCache(fileName, mapId, expectedIndexOwner);
     } else {
-      while (isUnderConstruction(info)) {
-        try {
-          // In case the entry is ready after the above check but
-          // before the following wait, we do timed wait.
-          info.wait(200);
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted waiting for construction", e);
+      synchronized(info) {
+        while (isUnderConstruction(info)) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted waiting for construction", e);
+          }
         }
       }
       LOG.debug("IndexCache HIT: MapId " + mapId + " found");
@@ -101,13 +101,13 @@ class IndexCache {
     IndexInformation info;
     IndexInformation newInd = new IndexInformation();
     if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
-      while (isUnderConstruction(info)) {
-        try {
-          // In case the entry is ready after the above check but
-          // before the following wait, we do timed wait.
-          info.wait(200);
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted waiting for construction", e);
+      synchronized(info) {
+        while (isUnderConstruction(info)) {
+          try {
+            info.wait();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted waiting for construction", e);
+          }
         }
       }
       LOG.debug("IndexCache HIT: MapId " + mapId + " found");

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Fri Oct 19 02:25:55 2012
@@ -1357,7 +1357,7 @@ public class JobConf extends Configurati
    * @return the maximum no. of failures of a given job per tasktracker.
    */
   public int getMaxTaskFailuresPerTracker() {
-    return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 4); 
+    return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java Fri Oct 19 02:25:55 2012
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -184,7 +185,7 @@ class JobQueueClient extends Configured 
     printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
     if (showJobs && (jobQueueInfo.getChildren() == null ||
         jobQueueInfo.getChildren().size() == 0)) {
-      JobStatus[] jobs = jc.getJobsFromQueue(queue);
+      JobStatus[] jobs = jobQueueInfo.getJobStatuses();
       if (jobs == null)
         jobs = new JobStatus[0];
       jc.displayJobList(jobs);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Fri Oct 19 02:25:55 2012
@@ -238,7 +238,7 @@ public class JobStatus extends org.apach
       stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(),
       stat.getCleanupProgress(), stat.getState().getValue(), 
       JobPriority.valueOf(stat.getPriority().name()),
-      stat.getUsername(), stat.getJobName(), stat.getJobFile(),
+      stat.getUsername(), stat.getJobName(), stat.getQueue(), stat.getJobFile(),
       stat.getTrackingUrl(), stat.isUber());
     old.setStartTime(stat.getStartTime());
     old.setFinishTime(stat.getFinishTime());

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Fri Oct 19 02:25:55 2012
@@ -424,6 +424,7 @@ class MapTask extends Task {
       job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
       job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
     }
+    LOG.info("Processing split: " + inputSplit);
   }
 
   static class NewTrackingRecordReader<K,V> 
@@ -694,6 +695,7 @@ class MapTask extends Task {
     org.apache.hadoop.mapreduce.InputSplit split = null;
     split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
         splitIndex.getStartOffset());
+    LOG.info("Processing split: " + split);
 
     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Fri Oct 19 02:25:55 2012
@@ -66,14 +66,6 @@ public class TaskLog {
   
   // localFS is set in (and used by) writeToIndexFile()
   static LocalFileSystem localFS = null;
-  static {
-    if (!LOG_DIR.exists()) {
-      boolean b = LOG_DIR.mkdirs();
-      if (!b) {
-        LOG.debug("mkdirs failed. Ignoring.");
-      }
-    }
-  }
   
   public static String getMRv2LogDir() {
     return System.getProperty(MRJobConfig.TASK_LOG_DIR);
@@ -638,6 +630,12 @@ public class TaskLog {
    * @return base log directory
    */
   static File getUserLogDir() {
+    if (!LOG_DIR.exists()) {
+      boolean b = LOG_DIR.mkdirs();
+      if (!b) {
+        LOG.debug("mkdirs failed. Ignoring.");
+      }
+    }
     return LOG_DIR;
   }
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Fri Oct 19 02:25:55 2012
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.Partitio
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+public class TotalOrderPartitioner<K ,V>
     extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K, V>
     implements Partitioner<K,V> {
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java Fri Oct 19 02:25:55 2012
@@ -313,7 +313,6 @@ public class Submitter extends Configure
     // add default debug script only when executable is expressed as
     // <path>#<executable>
     if (exec.contains("#")) {
-      DistributedCache.createSymlink(conf);
       // set default gdb commands for map and reduce task 
       String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
       setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Fri Oct 19 02:25:55 2012
@@ -131,6 +131,8 @@ public class Job extends JobContextImpl 
 
   Job(JobConf conf) throws IOException {
     super(conf, null);
+    // propagate existing user credentials to job
+    this.credentials.mergeAll(this.ugi.getCredentials());
     this.cluster = null;
   }
 
@@ -1049,9 +1051,10 @@ public class Job extends JobContextImpl 
   }
 
   /**
-   * This method allows you to create symlinks in the current working directory
-   * of the task to all the cache files/archives
+   * Originally intended to enable symlinks, but currently symlinks cannot be
+   * disabled.
    */
+  @Deprecated
   public void createSymlink() {
     ensureState(JobState.DEFINE);
     DistributedCache.createSymlink(conf);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java Fri Oct 19 02:25:55 2012
@@ -221,10 +221,11 @@ public interface JobContext extends MRJo
   public String getUser();
   
   /**
-   * This method checks to see if symlinks are to be create for the 
-   * localized cache files in the current working directory 
-   * @return true if symlinks are to be created- else return false
+   * Originally intended to check if symlinks should be used, but currently
+   * symlinks cannot be disabled.
+   * @return true
    */
+  @Deprecated
   public boolean getSymlink();
   
   /**
@@ -251,14 +252,22 @@ public interface JobContext extends MRJo
    * Return the path array of the localized caches
    * @return A path array of localized caches
    * @throws IOException
+   * @deprecated the array returned only includes the items the were 
+   * downloaded. There is no way to map this to what is returned by
+   * {@link #getCacheArchives()}.
    */
+  @Deprecated
   public Path[] getLocalCacheArchives() throws IOException;
 
   /**
    * Return the path array of the localized files
    * @return A path array of localized files
    * @throws IOException
+   * @deprecated the array returned only includes the items the were 
+   * downloaded. There is no way to map this to what is returned by
+   * {@link #getCacheFiles()}.
    */
+  @Deprecated
   public Path[] getLocalCacheFiles() throws IOException;
 
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Oct 19 02:25:55 2012
@@ -135,8 +135,9 @@ class JobSubmitter {
       short replication) throws IOException {
     Configuration conf = job.getConfiguration();
     if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
-      LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
-               "Applications should implement Tool for the same.");
+      LOG.warn("Hadoop command-line option parsing not performed. " +
+               "Implement the Tool interface and execute your application " +
+               "with ToolRunner to remedy this.");
     }
 
     // get all the command line arguments passed in by the user conf
@@ -189,7 +190,6 @@ class JobSubmitter {
           //should not throw a uri exception 
           throw new IOException("Failed to create uri for " + tmpFile, ue);
         }
-        DistributedCache.createSymlink(conf);
       }
     }
       
@@ -224,7 +224,6 @@ class JobSubmitter {
           //should not throw an uri excpetion
           throw new IOException("Failed to create uri for " + tmpArchives, ue);
         }
-        DistributedCache.createSymlink(conf);
       }
     }
 
@@ -233,9 +232,17 @@ class JobSubmitter {
       if ("".equals(job.getJobName())){
         job.setJobName(new Path(jobJar).getName());
       }
-      copyJar(new Path(jobJar), JobSubmissionFiles.getJobJar(submitJobDir), 
-          replication);
-      job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+      Path jobJarPath = new Path(jobJar);
+      URI jobJarURI = jobJarPath.toUri();
+      // If the job jar is already in fs, we don't need to copy it from local fs
+      if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null
+              || !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme()) 
+                  && jobJarURI.getAuthority().equals(
+                                            jtFs.getUri().getAuthority()))) {
+        copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), 
+            replication);
+        job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+      }
     } else {
       LOG.warn("No job jar file set.  User classes may not be found. "+
       "See Job or Job#setJar(String).");
@@ -428,13 +435,9 @@ class JobSubmitter {
   
   private void printTokens(JobID jobId,
       Credentials credentials) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Printing tokens for job: " + jobId);
-      for(Token<?> token: credentials.getAllTokens()) {
-        if (token.getKind().toString().equals("HDFS_DELEGATION_TOKEN")) {
-          LOG.debug("Submitting with " + token);
-        }
-      }
+    LOG.info("Submitting tokens for job: " + jobId);
+    for (Token<?> token: credentials.getAllTokens()) {
+      LOG.info(token);
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Oct 19 02:25:55 2012
@@ -79,4 +79,25 @@ public interface MRConfig {
   public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
   public static final String MAX_BLOCK_LOCATIONS_KEY =
     "mapreduce.job.max.split.locations";
-}
+
+  public static final String SHUFFLE_SSL_ENABLED_KEY =
+    "mapreduce.shuffle.ssl.enabled";
+
+  public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
+
+  /**
+   * Configuration key to enable/disable IFile readahead.
+   */
+  public static final String MAPRED_IFILE_READAHEAD =
+    "mapreduce.ifile.readahead";
+
+  public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+  /**
+   * Configuration key to set the IFile readahead length in bytes.
+   */
+  public static final String MAPRED_IFILE_READAHEAD_BYTES =
+    "mapreduce.ifile.readahead.bytes";
+
+  public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+    4 * 1024 * 1024;}

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Oct 19 02:25:55 2012
@@ -114,6 +114,10 @@ public interface MRJobConfig {
 
   public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
 
+  /**
+   * @deprecated Symlinks are always on and cannot be disabled.
+   */
+  @Deprecated
   public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";
 
   public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";
@@ -583,4 +587,18 @@ public interface MRJobConfig {
   MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT =
       "security.job.client.protocol.acl";
 
+  /**
+   * CLASSPATH for all YARN MapReduce applications.
+   */
+  public static final String MAPREDUCE_APPLICATION_CLASSPATH = 
+      "mapreduce.application.classpath";
+
+  /**
+   * Default CLASSPATH for all YARN MapReduce applications.
+   */
+  public static final String[] DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH = {
+      "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*",
+      "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
+  };
+  
 }