You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:52 UTC

svn commit: r1619012 [4/7] - in /hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project: ./ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Aug 19 23:49:39 2014
@@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Disp
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -535,7 +536,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -546,7 +547,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -557,7 +558,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -568,7 +569,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
     Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
 
@@ -579,7 +580,7 @@ public class TestJobImpl {
 
     // Verify access
     JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
-        null, null, null, true, null, 0, null, null, null, null);
+        null, null, null, true, user1, 0, null, null, null, null);
     Assert.assertTrue(job5.checkAccess(ugi1, null));
     Assert.assertTrue(job5.checkAccess(ugi2, null));
   }
@@ -656,6 +657,15 @@ public class TestJobImpl {
     conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
     isUber = testUberDecision(conf);
     Assert.assertFalse(isUber);
+    
+ // enable uber mode of 0 reducer no matter how much memory assigned to reducer
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);  
+    conf.setInt(MRJobConfig.NUM_REDUCES, 0);           
+    conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+    conf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 10);
+    isUber = testUberDecision(conf);
+    Assert.assertTrue(isUber);
   }
 
   private boolean testUberDecision(Configuration conf) {
@@ -728,6 +738,35 @@ public class TestJobImpl {
     commitHandler.stop();
   }
 
+  static final String EXCEPTIONMSG = "Splits max exceeded";
+  @Test
+  public void testMetaInfoSizeOverMax() throws Exception {
+    Configuration conf = new Configuration();
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+    JobImpl job =
+        new JobImpl(jobId, ApplicationAttemptId.newInstance(
+          ApplicationId.newInstance(0, 0), 0), conf, mock(EventHandler.class),
+          null, new JobTokenSecretManager(), new Credentials(), null, null,
+          mrAppMetrics, null, true, null, 0, null, null, null, null);
+    InitTransition initTransition = new InitTransition() {
+        @Override
+        protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+          throw new YarnRuntimeException(EXCEPTIONMSG);
+        }
+      };
+    JobEvent mockJobEvent = mock(JobEvent.class);
+
+    JobStateInternal jobSI = initTransition.transition(job, mockJobEvent);
+    Assert.assertTrue("When init fails, return value from InitTransition.transition should equal NEW.",
+                      jobSI.equals(JobStateInternal.NEW));
+    Assert.assertTrue("Job diagnostics should contain YarnRuntimeException",
+                      job.getDiagnostics().toString().contains("YarnRuntimeException"));
+    Assert.assertTrue("Job diagnostics should contain " + EXCEPTIONMSG,
+                      job.getDiagnostics().toString().contains(EXCEPTIONMSG));
+  }
+
   private static CommitterEventHandler createCommitterEventHandler(
       Dispatcher dispatcher, OutputCommitter committer) {
     final SystemClock clock = new SystemClock();

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java Tue Aug 19 23:49:39 2014
@@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.Map;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Tue Aug 19 23:49:39 2014
@@ -33,7 +33,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -65,6 +65,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
@@ -795,6 +796,178 @@ public class TestTaskAttempt{
 		finishTime, Long.valueOf(taImpl.getFinishTime()));  
   }
   
+  @Test
+  public void testContainerKillAfterAssigned() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, new Token(),
+        new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+        mock(Map.class)));
+    assertEquals("Task attempt is not in assinged state",
+        taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    assertEquals("Task should be in KILLED state",
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+  }
+
+  @Test
+  public void testContainerKillWhileRunning() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, new Token(),
+        new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+        mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    assertFalse("InternalError occurred trying to handle TA_KILL",
+        eventHandler.internalError);
+    assertEquals("Task should be in KILLED state",
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+  }
+
+  @Test
+  public void testContainerKillWhileCommitPending() throws Exception {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+        0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, new Token(),
+        new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+        mock(Map.class)));
+    taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_COMMIT_PENDING));
+    assertEquals("Task should be in COMMIT_PENDING state",
+        TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState());
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_KILL));
+    assertFalse("InternalError occurred trying to handle TA_KILL",
+        eventHandler.internalError);
+    assertEquals("Task should be in KILLED state",
+        TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java Tue Aug 19 23:49:39 2014
@@ -27,7 +27,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileStatus;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Tue Aug 19 23:49:39 2014
@@ -30,7 +30,7 @@ import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -439,4 +439,4 @@ public class TestContainerLauncher {
       throw new IOException(e);
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Tue Aug 19 23:49:39 2014
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
@@ -402,7 +403,7 @@ public class TestContainerLauncherImpl {
         1234), "password".getBytes(), new ContainerTokenIdentifier(
         contId, containerManagerAddr, "user",
         Resource.newInstance(1024, 1),
-        currentTime + 10000L, 123, currentTime));
+        currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0));
   }
 
   private static class ContainerManagerForTest implements ContainerManagementProtocol {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java Tue Aug 19 23:49:39 2014
@@ -31,7 +31,7 @@ import java.util.Map.Entry;
 
 import javax.net.ssl.SSLException;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.commons.httpclient.HttpStatus;
 import org.apache.hadoop.conf.Configuration;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java Tue Aug 19 23:49:39 2014
@@ -106,6 +106,7 @@ public class TestBlocks {
     when(report.getTaskState()).thenReturn(TaskState.SUCCEEDED);
     when(report.getStartTime()).thenReturn(100001L);
     when(report.getFinishTime()).thenReturn(100011L);
+    when(report.getStatus()).thenReturn("Dummy Status \n*");
 
 
     when(task.getReport()).thenReturn(report);
@@ -134,6 +135,8 @@ public class TestBlocks {
     assertTrue(data.toString().contains("SUCCEEDED"));
     assertTrue(data.toString().contains("100001"));
     assertTrue(data.toString().contains("100011"));
+    assertFalse(data.toString().contains("Dummy Status \n*"));
+    assertTrue(data.toString().contains("Dummy Status \\n*"));
 
 
   }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java Tue Aug 19 23:49:39 2014
@@ -168,10 +168,14 @@ public class FileNameIndexUtils {
           decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX]));
 
       try{
-        indexInfo.setJobStartTime(
-          Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
+        if (jobDetails.length <= JOB_START_TIME_INDEX) {
+          indexInfo.setJobStartTime(indexInfo.getSubmitTime());
+        } else {
+          indexInfo.setJobStartTime(
+              Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
+        }
       } catch (NumberFormatException e){
-        LOG.warn("Unable to parse launch time from job history file "
+        LOG.warn("Unable to parse start time from job history file "
             + jhFileName + " : " + e);
       }
     } catch (IndexOutOfBoundsException e) {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Tue Aug 19 23:49:39 2014
@@ -38,7 +38,9 @@ public class JHAdminConfig {
   public static final int DEFAULT_MR_HISTORY_PORT = 10020;
   public static final String DEFAULT_MR_HISTORY_ADDRESS = "0.0.0.0:" +
       DEFAULT_MR_HISTORY_PORT;
-  
+  public static final String MR_HISTORY_BIND_HOST = MR_HISTORY_PREFIX
+      + "bind-host";
+
   /** The address of the History server admin interface. */
   public static final String JHS_ADMIN_ADDRESS = MR_HISTORY_PREFIX
       + "admin.address";

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Tue Aug 19 23:49:39 2014
@@ -22,20 +22,24 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Calendar;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -117,6 +121,7 @@ public class JobHistoryUtils {
   public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR +  "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}";
   public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
   private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
+  private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
 
   private static final PathFilter CONF_FILTER = new PathFilter() {
     @Override
@@ -183,7 +188,7 @@ public class JobHistoryUtils {
     Path stagingPath = MRApps.getStagingAreaDir(conf, user);
     Path path = new Path(stagingPath, jobId);
     String logDir = path.toString();
-    return logDir;
+    return ensurePathInDefaultFileSystem(logDir, conf);
   }
   
   /**
@@ -200,7 +205,7 @@ public class JobHistoryUtils {
           MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
           + "/history/done_intermediate";
     }
-    return doneDirPrefix;
+    return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
   }
   
   /**
@@ -216,7 +221,69 @@ public class JobHistoryUtils {
           MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
           + "/history/done";
     }
-    return doneDirPrefix;
+    return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
+  }
+
+  /**
+   * Get default file system URI for the cluster (used to ensure consistency
+   * of history done/staging locations) over different context
+   *
+   * @return Default file context
+   */
+  private static FileContext getDefaultFileContext() {
+    // If FS_DEFAULT_NAME_KEY was set solely by core-default.xml then we ignore
+    // ignore it. This prevents defaulting history paths to file system specified
+    // by core-default.xml which would not make sense in any case. For a test
+    // case to exploit this functionality it should create core-site.xml
+    FileContext fc = null;
+    Configuration defaultConf = new Configuration();
+    String[] sources;
+    sources = defaultConf.getPropertySources(
+        CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
+    if (sources != null &&
+        (!Arrays.asList(sources).contains("core-default.xml") ||
+        sources.length > 1)) {
+      try {
+        fc = FileContext.getFileContext(defaultConf);
+        LOG.info("Default file system [" +
+                  fc.getDefaultFileSystem().getUri() + "]");
+      } catch (UnsupportedFileSystemException e) {
+        LOG.error("Unable to create default file context [" +
+            defaultConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
+            "]",
+            e);
+      }
+    }
+    else {
+      LOG.info("Default file system is set solely " +
+          "by core-default.xml therefore -  ignoring");
+    }
+
+    return fc;
+  }
+
+  /**
+   * Ensure that path belongs to cluster's default file system unless
+   * 1. it is already fully qualified.
+   * 2. current job configuration uses default file system
+   * 3. running from a test case without core-site.xml
+   *
+   * @param sourcePath source path
+   * @param conf the job configuration
+   * @return full qualified path (if necessary) in default file system
+   */
+  private static String ensurePathInDefaultFileSystem(String sourcePath, Configuration conf) {
+    Path path = new Path(sourcePath);
+    FileContext fc = getDefaultFileContext();
+    if (fc == null ||
+        fc.getDefaultFileSystem().getUri().toString().equals(
+            conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "")) ||
+        path.toUri().getAuthority() != null ||
+        path.toUri().getScheme()!= null) {
+      return sourcePath;
+    }
+
+    return fc.makeQualified(path).toString();
   }
 
   /**

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Tue Aug 19 23:49:39 2014
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -326,8 +327,8 @@ public class MRApps extends Apps {
   }
 
   /**
-   * Sets a {@link ApplicationClassLoader} on the given configuration and as
-   * the context classloader, if
+   * Creates and sets a {@link ApplicationClassLoader} on the given
+   * configuration and as the thread context classloader, if
    * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
    * the APP_CLASSPATH environment variable is set.
    * @param conf
@@ -335,25 +336,58 @@ public class MRApps extends Apps {
    */
   public static void setJobClassLoader(Configuration conf)
       throws IOException {
+    setClassLoader(createJobClassLoader(conf), conf);
+  }
+
+  /**
+   * Creates a {@link ApplicationClassLoader} if
+   * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
+   * the APP_CLASSPATH environment variable is set.
+   * @param conf
+   * @returns the created job classloader, or null if the job classloader is not
+   * enabled or the APP_CLASSPATH environment variable is not set
+   * @throws IOException
+   */
+  public static ClassLoader createJobClassLoader(Configuration conf)
+      throws IOException {
+    ClassLoader jobClassLoader = null;
     if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
       String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
       if (appClasspath == null) {
-        LOG.warn("Not using job classloader since APP_CLASSPATH is not set.");
+        LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
       } else {
-        LOG.info("Using job classloader");
+        LOG.info("Creating job classloader");
         if (LOG.isDebugEnabled()) {
           LOG.debug("APP_CLASSPATH=" + appClasspath);
         }
-        String[] systemClasses = conf.getStrings(
-            MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
-        ClassLoader jobClassLoader = createJobClassLoader(appClasspath,
+        String[] systemClasses = getSystemClasses(conf);
+        jobClassLoader = createJobClassLoader(appClasspath,
             systemClasses);
-        if (jobClassLoader != null) {
-          conf.setClassLoader(jobClassLoader);
-          Thread.currentThread().setContextClassLoader(jobClassLoader);
-        }
       }
     }
+    return jobClassLoader;
+  }
+
+  /**
+   * Sets the provided classloader on the given configuration and as the thread
+   * context classloader if the classloader is not null.
+   * @param classLoader
+   * @param conf
+   */
+  public static void setClassLoader(ClassLoader classLoader,
+      Configuration conf) {
+    if (classLoader != null) {
+      LOG.info("Setting classloader " + classLoader.getClass().getName() +
+          " on the configuration and as the thread context classloader");
+      conf.setClassLoader(classLoader);
+      Thread.currentThread().setContextClassLoader(classLoader);
+    }
+  }
+
+  @VisibleForTesting
+  static String[] getSystemClasses(Configuration conf) {
+    return conf.getTrimmedStrings(
+        MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
   }
 
   private static ClassLoader createJobClassLoader(final String appClasspath,

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java Tue Aug 19 23:49:39 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.v2.jo
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -105,11 +106,15 @@ public class MRWebAppUtil {
   
   public static InetSocketAddress getJHSWebBindAddress(Configuration conf) {
     if (httpPolicyInJHS == Policy.HTTPS_ONLY) {
-      return conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
+      return conf.getSocketAddr(
+          JHAdminConfig.MR_HISTORY_BIND_HOST,
+          JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
           JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
           JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT);
     } else {
-      return conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+      return conf.getSocketAddr(
+          JHAdminConfig.MR_HISTORY_BIND_HOST,
+          JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
           JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS,
           JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT);
     }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java Tue Aug 19 23:49:39 2014
@@ -23,16 +23,24 @@ import java.util.Collection;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestJobClient {
-  final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "/tmp")).getAbsolutePath();
+
+  final static String TEST_DIR = new File("target",
+    TestJobClient.class.getSimpleName()).getAbsolutePath();
+
+  @After
+  public void tearDown() {
+    FileUtil.fullyDelete(new File(TEST_DIR));
+  }
 
   @Test
   public void testGetClusterStatusWithLocalJobRunner() throws Exception {
@@ -51,11 +59,12 @@ public class TestJobClient {
     Assert.assertEquals(0, blackListedTrackersInfo.size());
   }
 
-  @Test(timeout = 1000)
+  @Test(timeout = 10000)
   public void testIsJobDirValid() throws IOException {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.getLocal(conf);
     Path testDir = new Path(TEST_DIR);
+    fs.mkdirs(testDir);
     Assert.assertFalse(JobClient.isJobDirValid(testDir, fs));
 
     Path jobconf = new Path(testDir, "job.xml");
@@ -68,7 +77,7 @@ public class TestJobClient {
     fs.delete(jobsplit, true);
   }
   
-  @Test(timeout = 1000)
+  @Test(timeout = 10000)
   public void testGetStagingAreaDir() throws IOException, InterruptedException {
     Configuration conf = new Configuration();
     JobClient client = new JobClient(conf);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
-import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Tue Aug 19 23:49:39 2014
@@ -27,7 +27,7 @@ import java.util.Arrays;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java Tue Aug 19 23:49:39 2014
@@ -22,7 +22,7 @@ package org.apache.hadoop.mapreduce.v2;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2;
 
-import junit.framework.Assert;
+import org.junit.Assert;
 
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java Tue Aug 19 23:49:39 2014
@@ -39,6 +39,17 @@ public class TestFileNameIndexUtils {
     + FileNameIndexUtils.DELIMITER + "%s"
     + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
 
+  private static final String OLD_FORMAT_BEFORE_ADD_START_TIME = "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + FileNameIndexUtils.DELIMITER + "%s"
+      + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
+
   private static final String JOB_HISTORY_FILE_FORMATTER = "%s"
     + FileNameIndexUtils.DELIMITER + "%s"
     + FileNameIndexUtils.DELIMITER + "%s"
@@ -236,6 +247,22 @@ public class TestFileNameIndexUtils {
   }
 
   @Test
+  public void testJobStartTimeBackwardsCompatible() throws IOException{
+    String jobHistoryFile = String.format(OLD_FORMAT_BEFORE_ADD_START_TIME,
+        JOB_ID,
+        SUBMIT_TIME,
+        USER_NAME,
+        JOB_NAME_WITH_DELIMITER_ESCAPE,
+        FINISH_TIME,
+        NUM_MAPS,
+        NUM_REDUCES,
+        JOB_STATUS,
+        QUEUE_NAME );
+    JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
+    Assert.assertEquals(info.getJobStartTime(), info.getSubmitTime());
+  }
+
+  @Test
   public void testJobHistoryFileNameBackwardsCompatible() throws IOException {
     JobID oldJobId = JobID.forName(JOB_ID);
     JobId jobId = TypeConverter.toYarn(oldJobId);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Tue Aug 19 23:49:39 2014
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
@@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.ApplicationClassLoader;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -400,7 +402,7 @@ public class TestMRApps {
     URI file = new URI("mockfs://mock/tmp/something.zip#something");
     Path filePath = new Path(file);
     URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
-    Path file2Path = new Path(file);
+    Path file2Path = new Path(file2);
     
     when(mockFs.resolvePath(filePath)).thenReturn(filePath);
     when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
@@ -492,4 +494,36 @@ public class TestMRApps {
     assertTrue(MRApps.TaskStateUI.COMPLETED.correspondsTo(TaskState.KILLED));
     assertTrue(MRApps.TaskStateUI.RUNNING.correspondsTo(TaskState.RUNNING));
   }
+
+
+  private static final String[] SYS_CLASSES = new String[] {
+      "/java/fake/Klass",
+      "/javax/fake/Klass",
+      "/org/apache/commons/logging/fake/Klass",
+      "/org/apache/log4j/fake/Klass",
+      "/org/apache/hadoop/fake/Klass"
+  };
+
+  private static final String[] DEFAULT_XMLS = new String[] {
+        "core-default.xml",
+      "mapred-default.xml",
+        "hdfs-default.xml",
+        "yarn-default.xml"
+  };
+
+  @Test
+  public void testSystemClasses() {
+    final List<String> systemClasses =
+        Arrays.asList(MRApps.getSystemClasses(new Configuration()));
+    for (String defaultXml : DEFAULT_XMLS) {
+      assertTrue(defaultXml + " must be system resource",
+          ApplicationClassLoader.isSystemClass(defaultXml, systemClasses));
+    }
+    for (String klass : SYS_CLASSES) {
+      assertTrue(klass + " must be system class",
+          ApplicationClassLoader.isSystemClass(klass, systemClasses));
+    }
+    assertFalse("/fake/Klass must not be a system class",
+        ApplicationClassLoader.isSystemClass("/fake/Klass", systemClasses));
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Tue Aug 19 23:49:39 2014
@@ -85,6 +85,16 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
+            <exclude>src/test/resources/testBOM.txt</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Tue Aug 19 23:49:39 2014
@@ -92,11 +92,11 @@
                                    }
           },
           {"name": "jobQueueName", "type": "string"},
-          {"name": "workflowId", "type": "string"},
-          {"name": "workflowName", "type": "string"},
-          {"name": "workflowNodeName", "type": "string"},
-          {"name": "workflowAdjacencies", "type": "string"},
-          {"name": "workflowTags", "type": "string"}
+          {"name": "workflowId", "type": ["null","string"], "default": null},
+          {"name": "workflowName", "type": ["null","string"], "default": null},
+          {"name": "workflowNodeName", "type": ["null","string"], "default": null},
+          {"name": "workflowAdjacencies", "type": ["null","string"], "default": null},
+          {"name": "workflowTags", "type": ["null","string"], "default": null}
       ]
      },
 
@@ -136,7 +136,7 @@
           {"name": "finishedMaps", "type": "int"},
           {"name": "finishedReduces", "type": "int"},
           {"name": "jobStatus", "type": "string"},
-          {"name": "diagnostics", "type": "string"}
+          {"name": "diagnostics", "type": ["null","string"], "default": null}
       ]
      },
 
@@ -205,8 +205,8 @@
           {"name": "httpPort", "type": "int"},
           {"name": "shufflePort", "type": "int"},
           {"name": "containerId", "type": "string"},
-          {"name": "locality", "type": "string"},
-          {"name": "avataar", "type": "string"}
+          {"name": "locality", "type": ["null","string"], "default": null},
+          {"name": "avataar", "type": ["null","string"], "default": null}
       ]
      },
 
@@ -221,7 +221,7 @@
           {"name": "rackname", "type": "string"},
           {"name": "status", "type": "string"},
           {"name": "error", "type": "string"},
-          {"name": "counters", "type": "JhCounters"},
+          {"name": "counters", "type": ["null","JhCounters"], "default": null},
           {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
           {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
           {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
@@ -237,7 +237,7 @@
           {"name": "error", "type": "string"},
           {"name": "failedDueToAttempt", "type": ["null", "string"] },
           {"name": "status", "type": "string"},
-          {"name": "counters", "type": "JhCounters"}
+          {"name": "counters", "type": ["null","JhCounters"], "default": null}
       ]
      },
 
@@ -248,7 +248,7 @@
           {"name": "finishTime", "type": "long"},
           {"name": "status", "type": "string"},
           {"name": "counters", "type": "JhCounters"},
-          {"name": "successfulAttemptId", "type": "string"}
+          {"name": "successfulAttemptId", "type": ["null","string"], "default": null}
       ]
      },
      	

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Tue Aug 19 23:49:39 2014
@@ -295,6 +295,15 @@ public abstract class FileInputFormat<K,
                                 String[] hosts) {
     return new FileSplit(file, start, length, hosts);
   }
+  
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileSplit makeSplit(Path file, long start, long length, 
+                                String[] hosts, String[] inMemoryHosts) {
+    return new FileSplit(file, start, length, hosts, inMemoryHosts);
+  }
 
   /** Splits files returned by {@link #listStatus(JobConf)} when
    * they're too big.*/ 
@@ -337,22 +346,22 @@ public abstract class FileInputFormat<K,
 
           long bytesRemaining = length;
           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-            String[] splitHosts = getSplitHosts(blkLocations,
+            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                 length-bytesRemaining, splitSize, clusterMap);
             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
-                splitHosts));
+                splitHosts[0], splitHosts[1]));
             bytesRemaining -= splitSize;
           }
 
           if (bytesRemaining != 0) {
-            String[] splitHosts = getSplitHosts(blkLocations, length
+            String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                 - bytesRemaining, bytesRemaining, clusterMap);
             splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
-                splitHosts));
+                splitHosts[0], splitHosts[1]));
           }
         } else {
-          String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
-          splits.add(makeSplit(path, 0, length, splitHosts));
+          String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
+          splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
         }
       } else { 
         //Create empty hosts array for zero length files
@@ -538,10 +547,30 @@ public abstract class FileInputFormat<K,
    * @param blkLocations The list of block locations
    * @param offset 
    * @param splitSize 
-   * @return array of hosts that contribute most to this split
+   * @return an array of hosts that contribute most to this split
    * @throws IOException
    */
   protected String[] getSplitHosts(BlockLocation[] blkLocations, 
+      long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
+    return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
+        clusterMap)[0];
+  }
+  
+  /** 
+   * This function identifies and returns the hosts that contribute 
+   * most for a given split. For calculating the contribution, rack
+   * locality is treated on par with host locality, so hosts from racks
+   * that contribute the most are preferred over hosts on racks that 
+   * contribute less
+   * @param blkLocations The list of block locations
+   * @param offset 
+   * @param splitSize 
+   * @return two arrays - one of hosts that contribute most to this split, and
+   *    one of hosts that contribute most to this split that have the data
+   *    cached on them
+   * @throws IOException
+   */
+  private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, 
       long offset, long splitSize, NetworkTopology clusterMap)
   throws IOException {
 
@@ -552,7 +581,8 @@ public abstract class FileInputFormat<K,
 
     //If this is the only block, just return
     if (bytesInThisBlock >= splitSize) {
-      return blkLocations[startIndex].getHosts();
+      return new String[][] { blkLocations[startIndex].getHosts(),
+          blkLocations[startIndex].getCachedHosts() };
     }
 
     long bytesInFirstBlock = bytesInThisBlock;
@@ -639,7 +669,9 @@ public abstract class FileInputFormat<K,
     
     } // for all indices
 
-    return identifyHosts(allTopos.length, racksMap);
+    // We don't yet support cached hosts when bytesInThisBlock > splitSize
+    return new String[][] { identifyHosts(allTopos.length, racksMap),
+        new String[0]};
   }
   
   private String[] identifyHosts(int replicationFactor, 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Tue Aug 19 23:49:39 2014
@@ -184,10 +184,16 @@ public class FileOutputCommitter extends
   }
   
   @Override
+  @Deprecated
   public boolean isRecoverySupported() {
     return true;
   }
-  
+
+  @Override
+  public boolean isRecoverySupported(JobContext context) throws IOException {
+    return getWrapped(context).isRecoverySupported(context);
+  }
+
   @Override
   public void recoverTask(TaskAttemptContext context)
       throws IOException {

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java Tue Aug 19 23:49:39 2014
@@ -24,6 +24,7 @@ import java.io.DataOutput;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.fs.Path;
 
 /** A section of an input file.  Returned by {@link
@@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit 
-                       implements InputSplit {
+                       implements InputSplitWithLocationInfo {
   org.apache.hadoop.mapreduce.lib.input.FileSplit fs; 
   protected FileSplit() {
     fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
@@ -62,6 +63,20 @@ public class FileSplit extends org.apach
            length, hosts);
   }
   
+  /** Constructs a split with host information
+  *
+  * @param file the file name
+  * @param start the position of the first byte in the file to process
+  * @param length the number of bytes in the file to process
+  * @param hosts the list of hosts containing the block, possibly null
+  * @param inMemoryHosts the list of hosts containing the block in memory
+  */
+ public FileSplit(Path file, long start, long length, String[] hosts,
+     String[] inMemoryHosts) {
+   fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start,
+          length, hosts, inMemoryHosts);
+ }
+  
   public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) {
     this.fs = fs;
   }
@@ -92,4 +107,9 @@ public class FileSplit extends org.apach
     return fs.getLocations();
   }
   
+  @Override
+  @Evolving
+  public SplitLocationInfo[] getLocationInfo() throws IOException {
+    return fs.getLocationInfo();
+  }
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java Tue Aug 19 23:49:39 2014
@@ -50,7 +50,7 @@ import org.apache.hadoop.fs.FileSystem;
  * bytes, of the input files. However, the {@link FileSystem} blocksize of  
  * the input files is treated as an upper bound for input splits. A lower bound 
  * on the split size can be set via 
- * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
+ * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
  * mapreduce.input.fileinputformat.split.minsize</a>.</p>
  * 
  * <p>Clearly, logical splits based on input-size is insufficient for many 

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Tue Aug 19 23:49:39 2014
@@ -112,7 +112,7 @@ import org.apache.log4j.Level;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class JobConf extends Configuration {
-  
+
   private static final Log LOG = LogFactory.getLog(JobConf.class);
 
   static{
@@ -882,7 +882,7 @@ public class JobConf extends Configurati
       JobContext.KEY_COMPARATOR, null, RawComparator.class);
     if (theClass != null)
       return ReflectionUtils.newInstance(theClass, this);
-    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
+    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
   }
 
   /**

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Tue Aug 19 23:49:39 2014
@@ -184,7 +184,7 @@ public class LineRecordReader implements
   private int maxBytesToConsume(long pos) {
     return isCompressedInput()
       ? Integer.MAX_VALUE
-      : (int) Math.min(Integer.MAX_VALUE, end - pos);
+      : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
   }
 
   private long getFilePosition() throws IOException {
@@ -197,6 +197,39 @@ public class LineRecordReader implements
     return retVal;
   }
 
+  private int skipUtfByteOrderMark(Text value) throws IOException {
+    // Strip BOM(Byte Order Mark)
+    // Text only support UTF-8, we only need to check UTF-8 BOM
+    // (0xEF,0xBB,0xBF) at the start of the text stream.
+    int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
+        Integer.MAX_VALUE);
+    int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
+    // Even we read 3 extra bytes for the first line,
+    // we won't alter existing behavior (no backwards incompat issue).
+    // Because the newSize is less than maxLineLength and
+    // the number of bytes copied to Text is always no more than newSize.
+    // If the return size from readLine is not less than maxLineLength,
+    // we will discard the current line and read the next line.
+    pos += newSize;
+    int textLength = value.getLength();
+    byte[] textBytes = value.getBytes();
+    if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
+        (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
+      // find UTF-8 BOM, strip it.
+      LOG.info("Found UTF-8 BOM and skipped it");
+      textLength -= 3;
+      newSize -= 3;
+      if (textLength > 0) {
+        // It may work to use the same buffer and not do the copyBytes
+        textBytes = value.copyBytes();
+        value.set(textBytes, 3, textLength);
+      } else {
+        value.clear();
+      }
+    }
+    return newSize;
+  }
+
   /** Read a line. */
   public synchronized boolean next(LongWritable key, Text value)
     throws IOException {
@@ -206,12 +239,17 @@ public class LineRecordReader implements
     while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
       key.set(pos);
 
-      int newSize = in.readLine(value, maxLineLength,
-          Math.max(maxBytesToConsume(pos), maxLineLength));
+      int newSize = 0;
+      if (pos == 0) {
+        newSize = skipUtfByteOrderMark(value);
+      } else {
+        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
+        pos += newSize;
+      }
+
       if (newSize == 0) {
         return false;
       }
-      pos += newSize;
       if (newSize < maxLineLength) {
         return true;
       }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Tue Aug 19 23:49:39 2014
@@ -537,6 +537,8 @@ public class Merger {  
         }
       }
       minSegment = top();
+      long startPos = minSegment.getPosition();
+      key = minSegment.getKey();
       if (!minSegment.inMemory()) {
         //When we load the value from an inmemory segment, we reset
         //the "value" DIB in this class to the inmem segment's byte[].
@@ -547,11 +549,11 @@ public class Merger {  
         //segment, we reset the "value" DIB to the byte[] in that (so 
         //we reuse the disk segment DIB whenever we consider
         //a disk segment).
+        minSegment.getValue(diskIFileValue);
         value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+      } else {
+        minSegment.getValue(value);
       }
-      long startPos = minSegment.getPosition();
-      key = minSegment.getKey();
-      minSegment.getValue(value);
       long endPos = minSegment.getPosition();
       totalBytesProcessed += endPos - startPos;
       mergeProgress.set(totalBytesProcessed * progPerByte);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java Tue Aug 19 23:49:39 2014
@@ -176,15 +176,35 @@ public abstract class OutputCommitter 
 
   /**
    * This method implements the new interface by calling the old method. Note
-   * that the input types are different between the new and old apis and this
-   * is a bridge between the two.
+   * that the input types are different between the new and old apis and this is
+   * a bridge between the two.
+   * 
+   * @deprecated Use {@link #isRecoverySupported(JobContext)} instead.
    */
+  @Deprecated
   @Override
   public boolean isRecoverySupported() {
     return false;
   }
 
   /**
+   * Is task output recovery supported for restarting jobs?
+   * 
+   * If task output recovery is supported, job restart can be done more
+   * efficiently.
+   * 
+   * @param jobContext
+   *          Context of the job whose output is being written.
+   * @return <code>true</code> if task output recovery is supported,
+   *         <code>false</code> otherwise
+   * @throws IOException
+   * @see #recoverTask(TaskAttemptContext)
+   */
+  public boolean isRecoverySupported(JobContext jobContext) throws IOException {
+    return isRecoverySupported();
+  }
+
+  /**
    * Recover the task output. 
    * 
    * The retry-count for the job will be passed via the 
@@ -315,4 +335,15 @@ public abstract class OutputCommitter 
     recoverTask((TaskAttemptContext) taskContext);
   }
 
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this is
+   * a bridge between the two.
+   */
+  @Override
+  public final boolean isRecoverySupported(
+      org.apache.hadoop.mapreduce.JobContext context) throws IOException {
+    return isRecoverySupported((JobContext) context);
+  }
+
 }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue Aug 19 23:49:39 2014
@@ -66,6 +66,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.StringUtils;
 
@@ -322,6 +323,11 @@ abstract public class Task implements Wr
   protected void reportFatalError(TaskAttemptID id, Throwable throwable, 
                                   String logMsg) {
     LOG.fatal(logMsg);
+    
+    if (ShutdownHookManager.get().isShutdownInProgress()) {
+      return;
+    }
+    
     Throwable tCause = throwable.getCause();
     String cause = tCause == null 
                    ? StringUtils.stringifyException(throwable)
@@ -1120,8 +1126,8 @@ abstract public class Task implements Wr
     if (isMapTask() && conf.getNumReduceTasks() > 0) {
       try {
         Path mapOutput =  mapOutputFile.getOutputFile();
-        FileSystem fs = mapOutput.getFileSystem(conf);
-        return fs.getFileStatus(mapOutput).getLen();
+        FileSystem localFS = FileSystem.getLocal(conf);
+        return localFS.getFileStatus(mapOutput).getLen();
       } catch (IOException e) {
         LOG.warn ("Could not find output size " , e);
       }

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Tue Aug 19 23:49:39 2014
@@ -90,8 +90,8 @@ public class TaskCompletionEvent 
   }
   
   /**
-   * Returns enum Status.SUCESS or Status.FAILURE.
-   * @return task tracker status
+   * Returns {@link Status}
+   * @return task completion status
    */
   public Status getTaskStatus() {
     return Status.valueOf(super.getStatus().name());

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Tue Aug 19 23:49:39 2014
@@ -199,16 +199,18 @@ public class TaskLog {
     // file first and then rename.
     File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup);
 
-    BufferedOutputStream bos = 
-      new BufferedOutputStream(
-        SecureIOUtils.createForWrite(tmpIndexFile, 0644));
-    DataOutputStream dos = new DataOutputStream(bos);
-    //the format of the index file is
-    //LOG_DIR: <the dir where the task logs are really stored>
-    //STDOUT: <start-offset in the stdout file> <length>
-    //STDERR: <start-offset in the stderr file> <length>
-    //SYSLOG: <start-offset in the syslog file> <length>   
+    BufferedOutputStream bos = null;
+    DataOutputStream dos = null;
     try{
+      bos = new BufferedOutputStream(
+          SecureIOUtils.createForWrite(tmpIndexFile, 0644));
+      dos = new DataOutputStream(bos);
+      //the format of the index file is
+      //LOG_DIR: <the dir where the task logs are really stored>
+      //STDOUT: <start-offset in the stdout file> <length>
+      //STDERR: <start-offset in the stderr file> <length>
+      //SYSLOG: <start-offset in the syslog file> <length>   
+
       dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"
           + LogName.STDOUT.toString() + ":");
       dos.writeBytes(Long.toString(prevOutLength) + " ");
@@ -225,8 +227,10 @@ public class TaskLog {
           + "\n");
       dos.close();
       dos = null;
+      bos.close();
+      bos = null;
     } finally {
-      IOUtils.cleanup(LOG, dos);
+      IOUtils.cleanup(LOG, dos, bos);
     }
 
     File indexFile = getIndexFile(currentTaskid, isCleanup);

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java Tue Aug 19 23:49:39 2014
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.Reporter
 /**
  * An InputFormat capable of performing joins over a set of data sources sorted
  * and partitioned the same way.
- * @see #setFormat
  *
  * A user may define new join types by setting the property
  * <tt>mapred.join.define.&lt;ident&gt;</tt> to a classname. In the expression
@@ -44,6 +43,7 @@ import org.apache.hadoop.mapred.Reporter
  * ComposableRecordReader.
  * <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
  * in the join.
+ * @see #setFormat
  * @see JoinRecordReader
  * @see MultiFilterRecordReader
  */

Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Tue Aug 19 23:49:39 2014
@@ -131,7 +131,7 @@ public abstract class CompositeRecordRea
   public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
     kids[rr.id()] = rr;
     if (null == q) {
-      cmp = WritableComparator.get(rr.createKey().getClass());
+      cmp = WritableComparator.get(rr.createKey().getClass(), conf);
       q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
           new Comparator<ComposableRecordReader<K,?>>() {
             public int compare(ComposableRecordReader<K,?> o1,