You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:52 UTC
svn commit: r1619012 [4/7] - in
/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project: ./ bin/
conf/ dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Aug 19 23:49:39 2014
@@ -81,6 +81,7 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -535,7 +536,7 @@ public class TestJobImpl {
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
- null, null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -546,7 +547,7 @@ public class TestJobImpl {
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
- null, null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -557,7 +558,7 @@ public class TestJobImpl {
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
- null, null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -568,7 +569,7 @@ public class TestJobImpl {
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
- null, null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
@@ -579,7 +580,7 @@ public class TestJobImpl {
// Verify access
JobImpl job5 = new JobImpl(jobId, null, conf5, null, null, null, null, null,
- null, null, null, true, null, 0, null, null, null, null);
+ null, null, null, true, user1, 0, null, null, null, null);
Assert.assertTrue(job5.checkAccess(ugi1, null));
Assert.assertTrue(job5.checkAccess(ugi2, null));
}
@@ -656,6 +657,15 @@ public class TestJobImpl {
conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1);
isUber = testUberDecision(conf);
Assert.assertFalse(isUber);
+
+ // enable uber mode of 0 reducer no matter how much memory assigned to reducer
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true);
+ conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+ conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048);
+ conf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 10);
+ isUber = testUberDecision(conf);
+ Assert.assertTrue(isUber);
}
private boolean testUberDecision(Configuration conf) {
@@ -728,6 +738,35 @@ public class TestJobImpl {
commitHandler.stop();
}
+ static final String EXCEPTIONMSG = "Splits max exceeded";
+ @Test
+ public void testMetaInfoSizeOverMax() throws Exception {
+ Configuration conf = new Configuration();
+ JobID jobID = JobID.forName("job_1234567890000_0001");
+ JobId jobId = TypeConverter.toYarn(jobID);
+ MRAppMetrics mrAppMetrics = MRAppMetrics.create();
+ JobImpl job =
+ new JobImpl(jobId, ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0, 0), 0), conf, mock(EventHandler.class),
+ null, new JobTokenSecretManager(), new Credentials(), null, null,
+ mrAppMetrics, null, true, null, 0, null, null, null, null);
+ InitTransition initTransition = new InitTransition() {
+ @Override
+ protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+ throw new YarnRuntimeException(EXCEPTIONMSG);
+ }
+ };
+ JobEvent mockJobEvent = mock(JobEvent.class);
+
+ JobStateInternal jobSI = initTransition.transition(job, mockJobEvent);
+ Assert.assertTrue("When init fails, return value from InitTransition.transition should equal NEW.",
+ jobSI.equals(JobStateInternal.NEW));
+ Assert.assertTrue("Job diagnostics should contain YarnRuntimeException",
+ job.getDiagnostics().toString().contains("YarnRuntimeException"));
+ Assert.assertTrue("Job diagnostics should contain " + EXCEPTIONMSG,
+ job.getDiagnostics().toString().contains(EXCEPTIONMSG));
+ }
+
private static CommitterEventHandler createCommitterEventHandler(
Dispatcher dispatcher, OutputCommitter committer) {
final SystemClock clock = new SystemClock();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java Tue Aug 19 23:49:39 2014
@@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
import java.util.Map;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Tue Aug 19 23:49:39 2014
@@ -33,7 +33,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -65,6 +65,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
@@ -795,6 +796,178 @@ public class TestTaskAttempt{
finishTime, Long.valueOf(taImpl.getFinishTime()));
}
+ @Test
+ public void testContainerKillAfterAssigned() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, new Token(),
+ new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+ mock(Map.class)));
+ assertEquals("Task attempt is not in assinged state",
+ taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_KILL));
+ assertEquals("Task should be in KILLED state",
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ taImpl.getInternalState());
+ }
+
+ @Test
+ public void testContainerKillWhileRunning() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, new Token(),
+ new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+ mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ assertEquals("Task attempt is not in running state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_KILL));
+ assertFalse("InternalError occurred trying to handle TA_KILL",
+ eventHandler.internalError);
+ assertEquals("Task should be in KILLED state",
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ taImpl.getInternalState());
+ }
+
+ @Test
+ public void testContainerKillWhileCommitPending() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
+ 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+ jobFile, 1, splits, jobConf, taListener, new Token(),
+ new Credentials(), new SystemClock(), appCtx);
+
+ NodeId nid = NodeId.newInstance("127.0.0.2", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container,
+ mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ assertEquals("Task attempt is not in running state", taImpl.getState(),
+ TaskAttemptState.RUNNING);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_COMMIT_PENDING));
+ assertEquals("Task should be in COMMIT_PENDING state",
+ TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState());
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_KILL));
+ assertFalse("InternalError occurred trying to handle TA_KILL",
+ eventHandler.internalError);
+ assertEquals("Task should be in KILLED state",
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ taImpl.getInternalState());
+ }
+
public static class MockEventHandler implements EventHandler {
public boolean internalError;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttemptContainerRequest.java Tue Aug 19 23:49:39 2014
@@ -27,7 +27,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Tue Aug 19 23:49:39 2014
@@ -30,7 +30,7 @@ import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -439,4 +439,4 @@ public class TestContainerLauncher {
throw new IOException(e);
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Tue Aug 19 23:49:39 2014
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
@@ -402,7 +403,7 @@ public class TestContainerLauncherImpl {
1234), "password".getBytes(), new ContainerTokenIdentifier(
contId, containerManagerAddr, "user",
Resource.newInstance(1024, 1),
- currentTime + 10000L, 123, currentTime));
+ currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0));
}
private static class ContainerManagerForTest implements ContainerManagementProtocol {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java Tue Aug 19 23:49:39 2014
@@ -31,7 +31,7 @@ import java.util.Map.Entry;
import javax.net.ssl.SSLException;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.hadoop.conf.Configuration;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestBlocks.java Tue Aug 19 23:49:39 2014
@@ -106,6 +106,7 @@ public class TestBlocks {
when(report.getTaskState()).thenReturn(TaskState.SUCCEEDED);
when(report.getStartTime()).thenReturn(100001L);
when(report.getFinishTime()).thenReturn(100011L);
+ when(report.getStatus()).thenReturn("Dummy Status \n*");
when(task.getReport()).thenReturn(report);
@@ -134,6 +135,8 @@ public class TestBlocks {
assertTrue(data.toString().contains("SUCCEEDED"));
assertTrue(data.toString().contains("100001"));
assertTrue(data.toString().contains("100011"));
+ assertFalse(data.toString().contains("Dummy Status \n*"));
+ assertTrue(data.toString().contains("Dummy Status \\n*"));
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java Tue Aug 19 23:49:39 2014
@@ -168,10 +168,14 @@ public class FileNameIndexUtils {
decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX]));
try{
- indexInfo.setJobStartTime(
- Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
+ if (jobDetails.length <= JOB_START_TIME_INDEX) {
+ indexInfo.setJobStartTime(indexInfo.getSubmitTime());
+ } else {
+ indexInfo.setJobStartTime(
+ Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX])));
+ }
} catch (NumberFormatException e){
- LOG.warn("Unable to parse launch time from job history file "
+ LOG.warn("Unable to parse start time from job history file "
+ jhFileName + " : " + e);
}
} catch (IndexOutOfBoundsException e) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Tue Aug 19 23:49:39 2014
@@ -38,7 +38,9 @@ public class JHAdminConfig {
public static final int DEFAULT_MR_HISTORY_PORT = 10020;
public static final String DEFAULT_MR_HISTORY_ADDRESS = "0.0.0.0:" +
DEFAULT_MR_HISTORY_PORT;
-
+ public static final String MR_HISTORY_BIND_HOST = MR_HISTORY_PREFIX
+ + "bind-host";
+
/** The address of the History server admin interface. */
public static final String JHS_ADMIN_ADDRESS = MR_HISTORY_PREFIX
+ "admin.address";
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Tue Aug 19 23:49:39 2014
@@ -22,20 +22,24 @@ import java.io.File;
import java.io.IOException;
import java.util.Calendar;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -117,6 +121,7 @@ public class JobHistoryUtils {
public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR + "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}";
public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
+ private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
private static final PathFilter CONF_FILTER = new PathFilter() {
@Override
@@ -183,7 +188,7 @@ public class JobHistoryUtils {
Path stagingPath = MRApps.getStagingAreaDir(conf, user);
Path path = new Path(stagingPath, jobId);
String logDir = path.toString();
- return logDir;
+ return ensurePathInDefaultFileSystem(logDir, conf);
}
/**
@@ -200,7 +205,7 @@ public class JobHistoryUtils {
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ "/history/done_intermediate";
}
- return doneDirPrefix;
+ return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
}
/**
@@ -216,7 +221,69 @@ public class JobHistoryUtils {
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ "/history/done";
}
- return doneDirPrefix;
+ return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
+ }
+
+ /**
+ * Get default file system URI for the cluster (used to ensure consistency
+ * of history done/staging locations) over different context
+ *
+ * @return Default file context
+ */
+ private static FileContext getDefaultFileContext() {
+ // If FS_DEFAULT_NAME_KEY was set solely by core-default.xml then we ignore
+ // ignore it. This prevents defaulting history paths to file system specified
+ // by core-default.xml which would not make sense in any case. For a test
+ // case to exploit this functionality it should create core-site.xml
+ FileContext fc = null;
+ Configuration defaultConf = new Configuration();
+ String[] sources;
+ sources = defaultConf.getPropertySources(
+ CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
+ if (sources != null &&
+ (!Arrays.asList(sources).contains("core-default.xml") ||
+ sources.length > 1)) {
+ try {
+ fc = FileContext.getFileContext(defaultConf);
+ LOG.info("Default file system [" +
+ fc.getDefaultFileSystem().getUri() + "]");
+ } catch (UnsupportedFileSystemException e) {
+ LOG.error("Unable to create default file context [" +
+ defaultConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
+ "]",
+ e);
+ }
+ }
+ else {
+ LOG.info("Default file system is set solely " +
+ "by core-default.xml therefore - ignoring");
+ }
+
+ return fc;
+ }
+
+ /**
+ * Ensure that path belongs to cluster's default file system unless
+ * 1. it is already fully qualified.
+ * 2. current job configuration uses default file system
+ * 3. running from a test case without core-site.xml
+ *
+ * @param sourcePath source path
+ * @param conf the job configuration
+ * @return full qualified path (if necessary) in default file system
+ */
+ private static String ensurePathInDefaultFileSystem(String sourcePath, Configuration conf) {
+ Path path = new Path(sourcePath);
+ FileContext fc = getDefaultFileContext();
+ if (fc == null ||
+ fc.getDefaultFileSystem().getUri().toString().equals(
+ conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "")) ||
+ path.toUri().getAuthority() != null ||
+ path.toUri().getScheme()!= null) {
+ return sourcePath;
+ }
+
+ return fc.makeQualified(path).toString();
}
/**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Tue Aug 19 23:49:39 2014
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -326,8 +327,8 @@ public class MRApps extends Apps {
}
/**
- * Sets a {@link ApplicationClassLoader} on the given configuration and as
- * the context classloader, if
+ * Creates and sets a {@link ApplicationClassLoader} on the given
+ * configuration and as the thread context classloader, if
* {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
* the APP_CLASSPATH environment variable is set.
* @param conf
@@ -335,25 +336,58 @@ public class MRApps extends Apps {
*/
public static void setJobClassLoader(Configuration conf)
throws IOException {
+ setClassLoader(createJobClassLoader(conf), conf);
+ }
+
+ /**
+ * Creates a {@link ApplicationClassLoader} if
+ * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
+ * the APP_CLASSPATH environment variable is set.
+ * @param conf
+ * @returns the created job classloader, or null if the job classloader is not
+ * enabled or the APP_CLASSPATH environment variable is not set
+ * @throws IOException
+ */
+ public static ClassLoader createJobClassLoader(Configuration conf)
+ throws IOException {
+ ClassLoader jobClassLoader = null;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
if (appClasspath == null) {
- LOG.warn("Not using job classloader since APP_CLASSPATH is not set.");
+ LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
} else {
- LOG.info("Using job classloader");
+ LOG.info("Creating job classloader");
if (LOG.isDebugEnabled()) {
LOG.debug("APP_CLASSPATH=" + appClasspath);
}
- String[] systemClasses = conf.getStrings(
- MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
- ClassLoader jobClassLoader = createJobClassLoader(appClasspath,
+ String[] systemClasses = getSystemClasses(conf);
+ jobClassLoader = createJobClassLoader(appClasspath,
systemClasses);
- if (jobClassLoader != null) {
- conf.setClassLoader(jobClassLoader);
- Thread.currentThread().setContextClassLoader(jobClassLoader);
- }
}
}
+ return jobClassLoader;
+ }
+
+ /**
+ * Sets the provided classloader on the given configuration and as the thread
+ * context classloader if the classloader is not null.
+ * @param classLoader
+ * @param conf
+ */
+ public static void setClassLoader(ClassLoader classLoader,
+ Configuration conf) {
+ if (classLoader != null) {
+ LOG.info("Setting classloader " + classLoader.getClass().getName() +
+ " on the configuration and as the thread context classloader");
+ conf.setClassLoader(classLoader);
+ Thread.currentThread().setContextClassLoader(classLoader);
+ }
+ }
+
+ @VisibleForTesting
+ static String[] getSystemClasses(Configuration conf) {
+ return conf.getTrimmedStrings(
+ MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
}
private static ClassLoader createJobClassLoader(final String appClasspath,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRWebAppUtil.java Tue Aug 19 23:49:39 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.v2.jo
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -105,11 +106,15 @@ public class MRWebAppUtil {
public static InetSocketAddress getJHSWebBindAddress(Configuration conf) {
if (httpPolicyInJHS == Policy.HTTPS_ONLY) {
- return conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
+ return conf.getSocketAddr(
+ JHAdminConfig.MR_HISTORY_BIND_HOST,
+ JHAdminConfig.MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_HTTPS_PORT);
} else {
- return conf.getSocketAddr(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+ return conf.getSocketAddr(
+ JHAdminConfig.MR_HISTORY_BIND_HOST,
+ JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClient.java Tue Aug 19 23:49:39 2014
@@ -23,16 +23,24 @@ import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class TestJobClient {
- final static String TEST_DIR = new File(System.getProperty("test.build.data",
- "/tmp")).getAbsolutePath();
+
+ final static String TEST_DIR = new File("target",
+ TestJobClient.class.getSimpleName()).getAbsolutePath();
+
+ @After
+ public void tearDown() {
+ FileUtil.fullyDelete(new File(TEST_DIR));
+ }
@Test
public void testGetClusterStatusWithLocalJobRunner() throws Exception {
@@ -51,11 +59,12 @@ public class TestJobClient {
Assert.assertEquals(0, blackListedTrackersInfo.size());
}
- @Test(timeout = 1000)
+ @Test(timeout = 10000)
public void testIsJobDirValid() throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path testDir = new Path(TEST_DIR);
+ fs.mkdirs(testDir);
Assert.assertFalse(JobClient.isJobDirValid(testDir, fs));
Path jobconf = new Path(testDir, "job.xml");
@@ -68,7 +77,7 @@ public class TestJobClient {
fs.delete(jobsplit, true);
}
- @Test(timeout = 1000)
+ @Test(timeout = 10000)
public void testGetStagingAreaDir() throws IOException, InterruptedException {
Configuration conf = new Configuration();
JobClient client = new JobClient(conf);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestJobClientGetJob.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,7 @@
package org.apache.hadoop.mapred;
-import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertNotNull;
import java.io.IOException;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Tue Aug 19 23:49:39 2014
@@ -27,7 +27,7 @@ import java.util.Arrays;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
-import junit.framework.Assert;
+import org.junit.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRPCFactories.java Tue Aug 19 23:49:39 2014
@@ -22,7 +22,7 @@ package org.apache.hadoop.mapreduce.v2;
import java.io.IOException;
import java.net.InetSocketAddress;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/TestRecordFactory.java Tue Aug 19 23:49:39 2014
@@ -18,7 +18,7 @@
package org.apache.hadoop.mapreduce.v2;
-import junit.framework.Assert;
+import org.junit.Assert;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java Tue Aug 19 23:49:39 2014
@@ -39,6 +39,17 @@ public class TestFileNameIndexUtils {
+ FileNameIndexUtils.DELIMITER + "%s"
+ JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
+ private static final String OLD_FORMAT_BEFORE_ADD_START_TIME = "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + FileNameIndexUtils.DELIMITER + "%s"
+ + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION;
+
private static final String JOB_HISTORY_FILE_FORMATTER = "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
+ FileNameIndexUtils.DELIMITER + "%s"
@@ -236,6 +247,22 @@ public class TestFileNameIndexUtils {
}
@Test
+ public void testJobStartTimeBackwardsCompatible() throws IOException{
+ String jobHistoryFile = String.format(OLD_FORMAT_BEFORE_ADD_START_TIME,
+ JOB_ID,
+ SUBMIT_TIME,
+ USER_NAME,
+ JOB_NAME_WITH_DELIMITER_ESCAPE,
+ FINISH_TIME,
+ NUM_MAPS,
+ NUM_REDUCES,
+ JOB_STATUS,
+ QUEUE_NAME );
+ JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile);
+ Assert.assertEquals(info.getJobStartTime(), info.getSubmitTime());
+ }
+
+ @Test
public void testJobHistoryFileNameBackwardsCompatible() throws IOException {
JobID oldJobId = JobID.forName(JOB_ID);
JobId jobId = TypeConverter.toYarn(oldJobId);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Tue Aug 19 23:49:39 2014
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.ApplicationClassLoader;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -400,7 +402,7 @@ public class TestMRApps {
URI file = new URI("mockfs://mock/tmp/something.zip#something");
Path filePath = new Path(file);
URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
- Path file2Path = new Path(file);
+ Path file2Path = new Path(file2);
when(mockFs.resolvePath(filePath)).thenReturn(filePath);
when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
@@ -492,4 +494,36 @@ public class TestMRApps {
assertTrue(MRApps.TaskStateUI.COMPLETED.correspondsTo(TaskState.KILLED));
assertTrue(MRApps.TaskStateUI.RUNNING.correspondsTo(TaskState.RUNNING));
}
+
+
+ private static final String[] SYS_CLASSES = new String[] {
+ "/java/fake/Klass",
+ "/javax/fake/Klass",
+ "/org/apache/commons/logging/fake/Klass",
+ "/org/apache/log4j/fake/Klass",
+ "/org/apache/hadoop/fake/Klass"
+ };
+
+ private static final String[] DEFAULT_XMLS = new String[] {
+ "core-default.xml",
+ "mapred-default.xml",
+ "hdfs-default.xml",
+ "yarn-default.xml"
+ };
+
+ @Test
+ public void testSystemClasses() {
+ final List<String> systemClasses =
+ Arrays.asList(MRApps.getSystemClasses(new Configuration()));
+ for (String defaultXml : DEFAULT_XMLS) {
+ assertTrue(defaultXml + " must be system resource",
+ ApplicationClassLoader.isSystemClass(defaultXml, systemClasses));
+ }
+ for (String klass : SYS_CLASSES) {
+ assertTrue(klass + " must be system class",
+ ApplicationClassLoader.isSystemClass(klass, systemClasses));
+ }
+ assertFalse("/fake/Klass must not be a system class",
+ ApplicationClassLoader.isSystemClass("/fake/Klass", systemClasses));
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Tue Aug 19 23:49:39 2014
@@ -85,6 +85,16 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/test/resources/recordSpanningMultipleSplits.txt</exclude>
+ <exclude>src/test/resources/testBOM.txt</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Tue Aug 19 23:49:39 2014
@@ -92,11 +92,11 @@
}
},
{"name": "jobQueueName", "type": "string"},
- {"name": "workflowId", "type": "string"},
- {"name": "workflowName", "type": "string"},
- {"name": "workflowNodeName", "type": "string"},
- {"name": "workflowAdjacencies", "type": "string"},
- {"name": "workflowTags", "type": "string"}
+ {"name": "workflowId", "type": ["null","string"], "default": null},
+ {"name": "workflowName", "type": ["null","string"], "default": null},
+ {"name": "workflowNodeName", "type": ["null","string"], "default": null},
+ {"name": "workflowAdjacencies", "type": ["null","string"], "default": null},
+ {"name": "workflowTags", "type": ["null","string"], "default": null}
]
},
@@ -136,7 +136,7 @@
{"name": "finishedMaps", "type": "int"},
{"name": "finishedReduces", "type": "int"},
{"name": "jobStatus", "type": "string"},
- {"name": "diagnostics", "type": "string"}
+ {"name": "diagnostics", "type": ["null","string"], "default": null}
]
},
@@ -205,8 +205,8 @@
{"name": "httpPort", "type": "int"},
{"name": "shufflePort", "type": "int"},
{"name": "containerId", "type": "string"},
- {"name": "locality", "type": "string"},
- {"name": "avataar", "type": "string"}
+ {"name": "locality", "type": ["null","string"], "default": null},
+ {"name": "avataar", "type": ["null","string"], "default": null}
]
},
@@ -221,7 +221,7 @@
{"name": "rackname", "type": "string"},
{"name": "status", "type": "string"},
{"name": "error", "type": "string"},
- {"name": "counters", "type": "JhCounters"},
+ {"name": "counters", "type": ["null","JhCounters"], "default": null},
{"name": "clockSplits", "type": { "type": "array", "items": "int"}},
{"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
{"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
@@ -237,7 +237,7 @@
{"name": "error", "type": "string"},
{"name": "failedDueToAttempt", "type": ["null", "string"] },
{"name": "status", "type": "string"},
- {"name": "counters", "type": "JhCounters"}
+ {"name": "counters", "type": ["null","JhCounters"], "default": null}
]
},
@@ -248,7 +248,7 @@
{"name": "finishTime", "type": "long"},
{"name": "status", "type": "string"},
{"name": "counters", "type": "JhCounters"},
- {"name": "successfulAttemptId", "type": "string"}
+ {"name": "successfulAttemptId", "type": ["null","string"], "default": null}
]
},
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Tue Aug 19 23:49:39 2014
@@ -295,6 +295,15 @@ public abstract class FileInputFormat<K,
String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
+
+ /**
+ * A factory that makes the split for this class. It can be overridden
+ * by sub-classes to make sub-types
+ */
+ protected FileSplit makeSplit(Path file, long start, long length,
+ String[] hosts, String[] inMemoryHosts) {
+ return new FileSplit(file, start, length, hosts, inMemoryHosts);
+ }
/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
@@ -337,22 +346,22 @@ public abstract class FileInputFormat<K,
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- String[] splitHosts = getSplitHosts(blkLocations,
+ String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
- splitHosts));
+ splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
- String[] splitHosts = getSplitHosts(blkLocations, length
+ String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
- splitHosts));
+ splitHosts[0], splitHosts[1]));
}
} else {
- String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
- splits.add(makeSplit(path, 0, length, splitHosts));
+ String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
+ splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
@@ -538,10 +547,30 @@ public abstract class FileInputFormat<K,
* @param blkLocations The list of block locations
* @param offset
* @param splitSize
- * @return array of hosts that contribute most to this split
+ * @return an array of hosts that contribute most to this split
* @throws IOException
*/
protected String[] getSplitHosts(BlockLocation[] blkLocations,
+ long offset, long splitSize, NetworkTopology clusterMap) throws IOException {
+ return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize,
+ clusterMap)[0];
+ }
+
+ /**
+ * This function identifies and returns the hosts that contribute
+ * most for a given split. For calculating the contribution, rack
+ * locality is treated on par with host locality, so hosts from racks
+ * that contribute the most are preferred over hosts on racks that
+ * contribute less
+ * @param blkLocations The list of block locations
+ * @param offset
+ * @param splitSize
+ * @return two arrays - one of hosts that contribute most to this split, and
+ * one of hosts that contribute most to this split that have the data
+ * cached on them
+ * @throws IOException
+ */
+ private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations,
long offset, long splitSize, NetworkTopology clusterMap)
throws IOException {
@@ -552,7 +581,8 @@ public abstract class FileInputFormat<K,
//If this is the only block, just return
if (bytesInThisBlock >= splitSize) {
- return blkLocations[startIndex].getHosts();
+ return new String[][] { blkLocations[startIndex].getHosts(),
+ blkLocations[startIndex].getCachedHosts() };
}
long bytesInFirstBlock = bytesInThisBlock;
@@ -639,7 +669,9 @@ public abstract class FileInputFormat<K,
} // for all indices
- return identifyHosts(allTopos.length, racksMap);
+ // We don't yet support cached hosts when bytesInThisBlock > splitSize
+ return new String[][] { identifyHosts(allTopos.length, racksMap),
+ new String[0]};
}
private String[] identifyHosts(int replicationFactor,
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Tue Aug 19 23:49:39 2014
@@ -184,10 +184,16 @@ public class FileOutputCommitter extends
}
@Override
+ @Deprecated
public boolean isRecoverySupported() {
return true;
}
-
+
+ @Override
+ public boolean isRecoverySupported(JobContext context) throws IOException {
+ return getWrapped(context).isRecoverySupported(context);
+ }
+
@Override
public void recoverTask(TaskAttemptContext context)
throws IOException {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java Tue Aug 19 23:49:39 2014
@@ -24,6 +24,7 @@ import java.io.DataOutput;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.fs.Path;
/** A section of an input file. Returned by {@link
@@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit
- implements InputSplit {
+ implements InputSplitWithLocationInfo {
org.apache.hadoop.mapreduce.lib.input.FileSplit fs;
protected FileSplit() {
fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit();
@@ -62,6 +63,20 @@ public class FileSplit extends org.apach
length, hosts);
}
+ /** Constructs a split with host information
+ *
+ * @param file the file name
+ * @param start the position of the first byte in the file to process
+ * @param length the number of bytes in the file to process
+ * @param hosts the list of hosts containing the block, possibly null
+ * @param inMemoryHosts the list of hosts containing the block in memory
+ */
+ public FileSplit(Path file, long start, long length, String[] hosts,
+ String[] inMemoryHosts) {
+ fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start,
+ length, hosts, inMemoryHosts);
+ }
+
public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) {
this.fs = fs;
}
@@ -92,4 +107,9 @@ public class FileSplit extends org.apach
return fs.getLocations();
}
+ @Override
+ @Evolving
+ public SplitLocationInfo[] getLocationInfo() throws IOException {
+ return fs.getLocationInfo();
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputFormat.java Tue Aug 19 23:49:39 2014
@@ -50,7 +50,7 @@ import org.apache.hadoop.fs.FileSystem;
* bytes, of the input files. However, the {@link FileSystem} blocksize of
* the input files is treated as an upper bound for input splits. A lower bound
* on the split size can be set via
- * <a href="{@docRoot}/../mapred-default.html#mapreduce.input.fileinputformat.split.minsize">
+ * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
* mapreduce.input.fileinputformat.split.minsize</a>.</p>
*
* <p>Clearly, logical splits based on input-size is insufficient for many
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Tue Aug 19 23:49:39 2014
@@ -112,7 +112,7 @@ import org.apache.log4j.Level;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class JobConf extends Configuration {
-
+
private static final Log LOG = LogFactory.getLog(JobConf.class);
static{
@@ -882,7 +882,7 @@ public class JobConf extends Configurati
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
- return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
+ return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}
/**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java Tue Aug 19 23:49:39 2014
@@ -184,7 +184,7 @@ public class LineRecordReader implements
private int maxBytesToConsume(long pos) {
return isCompressedInput()
? Integer.MAX_VALUE
- : (int) Math.min(Integer.MAX_VALUE, end - pos);
+ : (int) Math.max(Math.min(Integer.MAX_VALUE, end - pos), maxLineLength);
}
private long getFilePosition() throws IOException {
@@ -197,6 +197,39 @@ public class LineRecordReader implements
return retVal;
}
+ private int skipUtfByteOrderMark(Text value) throws IOException {
+ // Strip BOM(Byte Order Mark)
+ // Text only support UTF-8, we only need to check UTF-8 BOM
+ // (0xEF,0xBB,0xBF) at the start of the text stream.
+ int newMaxLineLength = (int) Math.min(3L + (long) maxLineLength,
+ Integer.MAX_VALUE);
+ int newSize = in.readLine(value, newMaxLineLength, maxBytesToConsume(pos));
+ // Even we read 3 extra bytes for the first line,
+ // we won't alter existing behavior (no backwards incompat issue).
+ // Because the newSize is less than maxLineLength and
+ // the number of bytes copied to Text is always no more than newSize.
+ // If the return size from readLine is not less than maxLineLength,
+ // we will discard the current line and read the next line.
+ pos += newSize;
+ int textLength = value.getLength();
+ byte[] textBytes = value.getBytes();
+ if ((textLength >= 3) && (textBytes[0] == (byte)0xEF) &&
+ (textBytes[1] == (byte)0xBB) && (textBytes[2] == (byte)0xBF)) {
+ // find UTF-8 BOM, strip it.
+ LOG.info("Found UTF-8 BOM and skipped it");
+ textLength -= 3;
+ newSize -= 3;
+ if (textLength > 0) {
+ // It may work to use the same buffer and not do the copyBytes
+ textBytes = value.copyBytes();
+ value.set(textBytes, 3, textLength);
+ } else {
+ value.clear();
+ }
+ }
+ return newSize;
+ }
+
/** Read a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
@@ -206,12 +239,17 @@ public class LineRecordReader implements
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
key.set(pos);
- int newSize = in.readLine(value, maxLineLength,
- Math.max(maxBytesToConsume(pos), maxLineLength));
+ int newSize = 0;
+ if (pos == 0) {
+ newSize = skipUtfByteOrderMark(value);
+ } else {
+ newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
+ pos += newSize;
+ }
+
if (newSize == 0) {
return false;
}
- pos += newSize;
if (newSize < maxLineLength) {
return true;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Tue Aug 19 23:49:39 2014
@@ -537,6 +537,8 @@ public class Merger {
}
}
minSegment = top();
+ long startPos = minSegment.getPosition();
+ key = minSegment.getKey();
if (!minSegment.inMemory()) {
//When we load the value from an inmemory segment, we reset
//the "value" DIB in this class to the inmem segment's byte[].
@@ -547,11 +549,11 @@ public class Merger {
//segment, we reset the "value" DIB to the byte[] in that (so
//we reuse the disk segment DIB whenever we consider
//a disk segment).
+ minSegment.getValue(diskIFileValue);
value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+ } else {
+ minSegment.getValue(value);
}
- long startPos = minSegment.getPosition();
- key = minSegment.getKey();
- minSegment.getValue(value);
long endPos = minSegment.getPosition();
totalBytesProcessed += endPos - startPos;
mergeProgress.set(totalBytesProcessed * progPerByte);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java Tue Aug 19 23:49:39 2014
@@ -176,15 +176,35 @@ public abstract class OutputCommitter
/**
* This method implements the new interface by calling the old method. Note
- * that the input types are different between the new and old apis and this
- * is a bridge between the two.
+ * that the input types are different between the new and old apis and this is
+ * a bridge between the two.
+ *
+ * @deprecated Use {@link #isRecoverySupported(JobContext)} instead.
*/
+ @Deprecated
@Override
public boolean isRecoverySupported() {
return false;
}
/**
+ * Is task output recovery supported for restarting jobs?
+ *
+ * If task output recovery is supported, job restart can be done more
+ * efficiently.
+ *
+ * @param jobContext
+ * Context of the job whose output is being written.
+ * @return <code>true</code> if task output recovery is supported,
+ * <code>false</code> otherwise
+ * @throws IOException
+ * @see #recoverTask(TaskAttemptContext)
+ */
+ public boolean isRecoverySupported(JobContext jobContext) throws IOException {
+ return isRecoverySupported();
+ }
+
+ /**
* Recover the task output.
*
* The retry-count for the job will be passed via the
@@ -315,4 +335,15 @@ public abstract class OutputCommitter
recoverTask((TaskAttemptContext) taskContext);
}
+ /**
+ * This method implements the new interface by calling the old method. Note
+ * that the input types are different between the new and old apis and this is
+ * a bridge between the two.
+ */
+ @Override
+ public final boolean isRecoverySupported(
+ org.apache.hadoop.mapreduce.JobContext context) throws IOException {
+ return isRecoverySupported((JobContext) context);
+ }
+
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Tue Aug 19 23:49:39 2014
@@ -66,6 +66,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils;
@@ -322,6 +323,11 @@ abstract public class Task implements Wr
protected void reportFatalError(TaskAttemptID id, Throwable throwable,
String logMsg) {
LOG.fatal(logMsg);
+
+ if (ShutdownHookManager.get().isShutdownInProgress()) {
+ return;
+ }
+
Throwable tCause = throwable.getCause();
String cause = tCause == null
? StringUtils.stringifyException(throwable)
@@ -1120,8 +1126,8 @@ abstract public class Task implements Wr
if (isMapTask() && conf.getNumReduceTasks() > 0) {
try {
Path mapOutput = mapOutputFile.getOutputFile();
- FileSystem fs = mapOutput.getFileSystem(conf);
- return fs.getFileStatus(mapOutput).getLen();
+ FileSystem localFS = FileSystem.getLocal(conf);
+ return localFS.getFileStatus(mapOutput).getLen();
} catch (IOException e) {
LOG.warn ("Could not find output size " , e);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Tue Aug 19 23:49:39 2014
@@ -90,8 +90,8 @@ public class TaskCompletionEvent
}
/**
- * Returns enum Status.SUCESS or Status.FAILURE.
- * @return task tracker status
+ * Returns {@link Status}
+ * @return task completion status
*/
public Status getTaskStatus() {
return Status.valueOf(super.getStatus().name());
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Tue Aug 19 23:49:39 2014
@@ -199,16 +199,18 @@ public class TaskLog {
// file first and then rename.
File tmpIndexFile = getTmpIndexFile(currentTaskid, isCleanup);
- BufferedOutputStream bos =
- new BufferedOutputStream(
- SecureIOUtils.createForWrite(tmpIndexFile, 0644));
- DataOutputStream dos = new DataOutputStream(bos);
- //the format of the index file is
- //LOG_DIR: <the dir where the task logs are really stored>
- //STDOUT: <start-offset in the stdout file> <length>
- //STDERR: <start-offset in the stderr file> <length>
- //SYSLOG: <start-offset in the syslog file> <length>
+ BufferedOutputStream bos = null;
+ DataOutputStream dos = null;
try{
+ bos = new BufferedOutputStream(
+ SecureIOUtils.createForWrite(tmpIndexFile, 0644));
+ dos = new DataOutputStream(bos);
+ //the format of the index file is
+ //LOG_DIR: <the dir where the task logs are really stored>
+ //STDOUT: <start-offset in the stdout file> <length>
+ //STDERR: <start-offset in the stderr file> <length>
+ //SYSLOG: <start-offset in the syslog file> <length>
+
dos.writeBytes(LogFileDetail.LOCATION + logLocation + "\n"
+ LogName.STDOUT.toString() + ":");
dos.writeBytes(Long.toString(prevOutLength) + " ");
@@ -225,8 +227,10 @@ public class TaskLog {
+ "\n");
dos.close();
dos = null;
+ bos.close();
+ bos = null;
} finally {
- IOUtils.cleanup(LOG, dos);
+ IOUtils.cleanup(LOG, dos, bos);
}
File indexFile = getIndexFile(currentTaskid, isCleanup);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeInputFormat.java Tue Aug 19 23:49:39 2014
@@ -36,7 +36,6 @@ import org.apache.hadoop.mapred.Reporter
/**
* An InputFormat capable of performing joins over a set of data sources sorted
* and partitioned the same way.
- * @see #setFormat
*
* A user may define new join types by setting the property
* <tt>mapred.join.define.<ident></tt> to a classname. In the expression
@@ -44,6 +43,7 @@ import org.apache.hadoop.mapred.Reporter
* ComposableRecordReader.
* <tt>mapred.join.keycomparator</tt> can be a classname used to compare keys
* in the join.
+ * @see #setFormat
* @see JoinRecordReader
* @see MultiFilterRecordReader
*/
Modified: hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/join/CompositeRecordReader.java Tue Aug 19 23:49:39 2014
@@ -131,7 +131,7 @@ public abstract class CompositeRecordRea
public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
kids[rr.id()] = rr;
if (null == q) {
- cmp = WritableComparator.get(rr.createKey().getClass());
+ cmp = WritableComparator.get(rr.createKey().getClass(), conf);
q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
new Comparator<ComposableRecordReader<K,?>>() {
public int compare(ComposableRecordReader<K,?> o1,