You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:28:42 UTC
svn commit: r1399950 [3/11] - in
/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/
dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduc...
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Fri Oct 19 02:25:55 2012
@@ -28,7 +28,6 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -36,23 +35,17 @@ import java.util.Map;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapTaskAttemptImpl;
-import org.apache.hadoop.mapred.WrappedJvmID;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletion;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -79,18 +72,14 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
@@ -101,82 +90,6 @@ import org.mockito.ArgumentCaptor;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestTaskAttempt{
- @Test
- public void testAttemptContainerRequest() throws Exception {
- //WARNING: This test must run first. This is because there is an
- // optimization where the credentials passed in are cached statically so
- // they do not need to be recomputed when creating a new
- // ContainerLaunchContext. if other tests run first this code will cache
- // their credentials and this test will fail trying to look for the
- // credentials it inserted in.
- final Text SECRET_KEY_ALIAS = new Text("secretkeyalias");
- final byte[] SECRET_KEY = ("secretkey").getBytes();
- Map<ApplicationAccessType, String> acls =
- new HashMap<ApplicationAccessType, String>(1);
- acls.put(ApplicationAccessType.VIEW_APP, "otheruser");
- ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
- JobId jobId = MRBuilderUtils.newJobId(appId, 1);
- TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
- Path jobFile = mock(Path.class);
-
- EventHandler eventHandler = mock(EventHandler.class);
- TaskAttemptListener taListener = mock(TaskAttemptListener.class);
- when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-
- JobConf jobConf = new JobConf();
- jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
- jobConf.setBoolean("fs.file.impl.disable.cache", true);
- jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
-
- // setup UGI for security so tokens and keys are preserved
- jobConf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- UserGroupInformation.setConfiguration(jobConf);
-
- Credentials credentials = new Credentials();
- credentials.addSecretKey(SECRET_KEY_ALIAS, SECRET_KEY);
- Token<JobTokenIdentifier> jobToken = new Token<JobTokenIdentifier>(
- ("tokenid").getBytes(), ("tokenpw").getBytes(),
- new Text("tokenkind"), new Text("tokenservice"));
-
- TaskAttemptImpl taImpl =
- new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
- mock(TaskSplitMetaInfo.class), jobConf, taListener,
- mock(OutputCommitter.class), jobToken, credentials,
- new SystemClock(), null);
-
- jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, taImpl.getID().toString());
- ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
-
- ContainerLaunchContext launchCtx =
- TaskAttemptImpl.createContainerLaunchContext(acls, containerId,
- jobConf, jobToken, taImpl.createRemoteTask(),
- TypeConverter.fromYarn(jobId), mock(Resource.class),
- mock(WrappedJvmID.class), taListener,
- credentials);
-
- Assert.assertEquals("ACLs mismatch", acls, launchCtx.getApplicationACLs());
- Credentials launchCredentials = new Credentials();
-
- DataInputByteBuffer dibb = new DataInputByteBuffer();
- dibb.reset(launchCtx.getContainerTokens());
- launchCredentials.readTokenStorageStream(dibb);
-
- // verify all tokens specified for the task attempt are in the launch context
- for (Token<? extends TokenIdentifier> token : credentials.getAllTokens()) {
- Token<? extends TokenIdentifier> launchToken =
- launchCredentials.getToken(token.getService());
- Assert.assertNotNull("Token " + token.getService() + " is missing",
- launchToken);
- Assert.assertEquals("Token " + token.getService() + " mismatch",
- token, launchToken);
- }
-
- // verify the secret key is in the launch context
- Assert.assertNotNull("Secret key missing",
- launchCredentials.getSecretKey(SECRET_KEY_ALIAS));
- Assert.assertTrue("Secret key mismatch", Arrays.equals(SECRET_KEY,
- launchCredentials.getSecretKey(SECRET_KEY_ALIAS)));
- }
static public class StubbedFS extends RawLocalFileSystem {
@Override
@@ -227,7 +140,7 @@ public class TestTaskAttempt{
//Only a single occurrence of /DefaultRack
assertEquals(1, requestedRacks.length);
}
-
+
@Test
public void testHostResolveAttempt() throws Exception {
TaskAttemptImpl.RequestContainerTransition rct =
@@ -266,7 +179,7 @@ public class TestTaskAttempt{
}
assertEquals(0, expected.size());
}
-
+
@Test
public void testSlotMillisCounterUpdate() throws Exception {
verifySlotMillis(2048, 2048, 1024);
@@ -325,13 +238,13 @@ public class TestTaskAttempt{
.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
.getValue());
}
-
+
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo) {
Clock clock = new SystemClock();
return createMapTaskAttemptImplForTest(eventHandler, taskSplitMetaInfo, clock);
}
-
+
private TaskAttemptImpl createMapTaskAttemptImplForTest(
EventHandler eventHandler, TaskSplitMetaInfo taskSplitMetaInfo, Clock clock) {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
@@ -402,30 +315,30 @@ public class TestTaskAttempt{
};
}
}
-
+
@Test
public void testLaunchFailedWhileKilling() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
- ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 0);
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
Path jobFile = mock(Path.class);
-
+
MockEventHandler eventHandler = new MockEventHandler();
TaskAttemptListener taListener = mock(TaskAttemptListener.class);
when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
-
+
JobConf jobConf = new JobConf();
jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
jobConf.setBoolean("fs.file.impl.disable.cache", true);
jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
-
+
TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
-
+
TaskAttemptImpl taImpl =
new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
splits, jobConf, taListener,
@@ -437,7 +350,7 @@ public class TestTaskAttempt{
Container container = mock(Container.class);
when(container.getId()).thenReturn(contId);
when(container.getNodeId()).thenReturn(nid);
-
+
taImpl.handle(new TaskAttemptEvent(attemptId,
TaskAttemptEventType.TA_SCHEDULE));
taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
@@ -450,7 +363,7 @@ public class TestTaskAttempt{
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED));
assertFalse(eventHandler.internalError);
}
-
+
@Test
public void testContainerCleanedWhileRunning() throws Exception {
ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
@@ -566,9 +479,76 @@ public class TestTaskAttempt{
eventHandler.internalError);
}
+ @Test
+ public void testDoubleTooManyFetchFailure() throws Exception {
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 0);
+ JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+ TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+ Path jobFile = mock(Path.class);
+
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
+
+ JobConf jobConf = new JobConf();
+ jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ jobConf.setBoolean("fs.file.impl.disable.cache", true);
+ jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+ jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+ TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+ when(splits.getLocations()).thenReturn(new String[] {"127.0.0.1"});
+
+ AppContext appCtx = mock(AppContext.class);
+ ClusterInfo clusterInfo = mock(ClusterInfo.class);
+ Resource resource = mock(Resource.class);
+ when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+ when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+ when(resource.getMemory()).thenReturn(1024);
+
+ TaskAttemptImpl taImpl =
+ new MapTaskAttemptImpl(taskId, 1, eventHandler, jobFile, 1,
+ splits, jobConf, taListener,
+ mock(OutputCommitter.class), mock(Token.class), new Credentials(),
+ new SystemClock(), appCtx);
+
+ NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+ ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_SCHEDULE));
+ taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId,
+ container, mock(Map.class)));
+ taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_DONE));
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+
+ assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+ TaskAttemptState.SUCCEEDED);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+ assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ taImpl.handle(new TaskAttemptEvent(attemptId,
+ TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE));
+ assertEquals("Task attempt is not in FAILED state, still", taImpl.getState(),
+ TaskAttemptState.FAILED);
+ assertFalse("InternalError occurred trying to handle TA_CONTAINER_CLEANED",
+ eventHandler.internalError);
+ }
+
public static class MockEventHandler implements EventHandler {
public boolean internalError;
-
+
@Override
public void handle(Event event) {
if (event instanceof JobEvent) {
@@ -578,6 +558,6 @@ public class TestTaskAttempt{
}
}
}
-
+
};
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Fri Oct 19 02:25:55 2012
@@ -84,7 +84,6 @@ public class TestTaskImpl {
private ApplicationId appId;
private TaskSplitMetaInfo taskSplitMetaInfo;
private String[] dataLocations = new String[0];
- private final TaskType taskType = TaskType.MAP;
private AppContext appContext;
private int startCount = 0;
@@ -97,6 +96,7 @@ public class TestTaskImpl {
private class MockTaskImpl extends TaskImpl {
private int taskAttemptCounter = 0;
+ TaskType taskType;
public MockTaskImpl(JobId jobId, int partition,
EventHandler eventHandler, Path remoteJobConfFile, JobConf conf,
@@ -104,11 +104,12 @@ public class TestTaskImpl {
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
Map<TaskId, TaskInfo> completedTasksFromPreviousRun, int startCount,
- MRAppMetrics metrics, AppContext appContext) {
+ MRAppMetrics metrics, AppContext appContext, TaskType taskType) {
super(jobId, taskType , partition, eventHandler,
remoteJobConfFile, conf, taskAttemptListener, committer,
jobToken, credentials, clock,
completedTasksFromPreviousRun, startCount, metrics, appContext);
+ this.taskType = taskType;
}
@Override
@@ -120,7 +121,7 @@ public class TestTaskImpl {
protected TaskAttemptImpl createAttempt() {
MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getID(), ++taskAttemptCounter,
eventHandler, taskAttemptListener, remoteJobConfFile, partition,
- conf, committer, jobToken, credentials, clock, appContext);
+ conf, committer, jobToken, credentials, clock, appContext, taskType);
taskAttempts.add(attempt);
return attempt;
}
@@ -142,18 +143,20 @@ public class TestTaskImpl {
private float progress = 0;
private TaskAttemptState state = TaskAttemptState.NEW;
private TaskAttemptId attemptId;
+ private TaskType taskType;
public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
JobConf conf, OutputCommitter committer,
Token<JobTokenIdentifier> jobToken,
Credentials credentials, Clock clock,
- AppContext appContext) {
+ AppContext appContext, TaskType taskType) {
super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
dataLocations, committer, jobToken, credentials, clock, appContext);
attemptId = Records.newRecord(TaskAttemptId.class);
attemptId.setId(id);
attemptId.setTaskId(taskId);
+ this.taskType = taskType;
}
public TaskAttemptId getAttemptId() {
@@ -162,7 +165,7 @@ public class TestTaskImpl {
@Override
protected Task createRemoteTask() {
- return new MockTask();
+ return new MockTask(taskType);
}
public float getProgress() {
@@ -185,6 +188,11 @@ public class TestTaskImpl {
private class MockTask extends Task {
+ private TaskType taskType;
+ MockTask(TaskType taskType) {
+ this.taskType = taskType;
+ }
+
@Override
public void run(JobConf job, TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
@@ -193,7 +201,7 @@ public class TestTaskImpl {
@Override
public boolean isMapTask() {
- return true;
+ return (taskType == TaskType.MAP);
}
}
@@ -227,14 +235,15 @@ public class TestTaskImpl {
taskSplitMetaInfo = mock(TaskSplitMetaInfo.class);
when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations);
- taskAttempts = new ArrayList<MockTaskAttemptImpl>();
-
- mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+ taskAttempts = new ArrayList<MockTaskAttemptImpl>();
+ }
+
+ private MockTaskImpl createMockTask(TaskType taskType) {
+ return new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
remoteJobConfFile, conf, taskAttemptListener, committer, jobToken,
credentials, clock,
completedTasksFromPreviousRun, startCount,
- metrics, appContext);
-
+ metrics, appContext, taskType);
}
@After
@@ -342,6 +351,7 @@ public class TestTaskImpl {
@Test
public void testInit() {
LOG.info("--- START: testInit ---");
+ mockTask = createMockTask(TaskType.MAP);
assertTaskNewState();
assert(taskAttempts.size() == 0);
}
@@ -352,6 +362,7 @@ public class TestTaskImpl {
*/
public void testScheduleTask() {
LOG.info("--- START: testScheduleTask ---");
+ mockTask = createMockTask(TaskType.MAP);
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
}
@@ -362,6 +373,7 @@ public class TestTaskImpl {
*/
public void testKillScheduledTask() {
LOG.info("--- START: testKillScheduledTask ---");
+ mockTask = createMockTask(TaskType.MAP);
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
killTask(taskId);
@@ -374,6 +386,7 @@ public class TestTaskImpl {
*/
public void testKillScheduledTaskAttempt() {
LOG.info("--- START: testKillScheduledTaskAttempt ---");
+ mockTask = createMockTask(TaskType.MAP);
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
killScheduledTaskAttempt(getLastAttempt().getAttemptId());
@@ -386,6 +399,7 @@ public class TestTaskImpl {
*/
public void testLaunchTaskAttempt() {
LOG.info("--- START: testLaunchTaskAttempt ---");
+ mockTask = createMockTask(TaskType.MAP);
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -398,6 +412,7 @@ public class TestTaskImpl {
*/
public void testKillRunningTaskAttempt() {
LOG.info("--- START: testKillRunningTaskAttempt ---");
+ mockTask = createMockTask(TaskType.MAP);
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -407,6 +422,7 @@ public class TestTaskImpl {
@Test
public void testTaskProgress() {
LOG.info("--- START: testTaskProgress ---");
+ mockTask = createMockTask(TaskType.MAP);
// launch task
TaskId taskId = getNewTaskID();
@@ -444,6 +460,7 @@ public class TestTaskImpl {
@Test
public void testFailureDuringTaskAttemptCommit() {
+ mockTask = createMockTask(TaskType.MAP);
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -469,8 +486,7 @@ public class TestTaskImpl {
assertTaskSucceededState();
}
- @Test
- public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+ private void runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType failEvent) {
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -489,11 +505,34 @@ public class TestTaskImpl {
// Now fail the first task attempt, after the second has succeeded
mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(),
- TaskEventType.T_ATTEMPT_FAILED));
+ failEvent));
// The task should still be in the succeeded state
assertTaskSucceededState();
-
+ }
+
+ @Test
+ public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+ mockTask = createMockTask(TaskType.MAP);
+ runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
+ }
+
+ @Test
+ public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+ mockTask = createMockTask(TaskType.REDUCE);
+ runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
+ }
+
+ @Test
+ public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
+ mockTask = createMockTask(TaskType.MAP);
+ runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
+ }
+
+ @Test
+ public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
+ mockTask = createMockTask(TaskType.REDUCE);
+ runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebServicesJobs.java Fri Oct 19 02:25:55 2012
@@ -972,7 +972,8 @@ public class TestAMWebServicesJobs exten
WebServicesTestUtils.checkStringMatch("containerId", amInfo
.getContainerId().toString(), containerId);
- String localLogsLink = ujoin("node", "containerlogs", containerId);
+ String localLogsLink =ujoin("node", "containerlogs", containerId,
+ job.getUserName());
assertTrue("logsLink", logsLink.contains(localLogsLink));
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml Fri Oct 19 02:25:55 2012
@@ -39,6 +39,10 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
<dependency>
@@ -77,7 +81,7 @@
<configuration>
<executable>protoc</executable>
<arguments>
- <argument>-I../../hadoop-yarn/hadoop-yarn-api/src/main/proto/</argument>
+ <argument>-I../../../hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/</argument>
<argument>-Isrc/main/proto/</argument>
<argument>--java_out=target/generated-sources/proto</argument>
<argument>src/main/proto/mr_protos.proto</argument>
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java Fri Oct 19 02:25:55 2012
@@ -18,12 +18,9 @@
package org.apache.hadoop.mapred;
-import com.google.common.collect.Maps;
-
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
-import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -34,6 +31,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -60,6 +58,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -85,6 +84,8 @@ class LocalDistributedCacheManager {
* @throws IOException
*/
public void setup(JobConf conf) throws IOException {
+ File workDir = new File(System.getProperty("user.dir"));
+
// Generate YARN local resources objects corresponding to the distributed
// cache configuration
Map<String, LocalResource> localResources =
@@ -132,7 +133,8 @@ class LocalDistributedCacheManager {
Future<Path> future = exec.submit(download);
resourcesToPaths.put(resource, future);
}
- for (LocalResource resource : localResources.values()) {
+ for (Entry<String, LocalResource> entry : localResources.entrySet()) {
+ LocalResource resource = entry.getValue();
Path path;
try {
path = resourcesToPaths.get(resource).get();
@@ -142,10 +144,18 @@ class LocalDistributedCacheManager {
throw new IOException(e);
}
String pathString = path.toUri().toString();
+ String link = entry.getKey();
+ String target = new File(path.toUri()).getPath();
+ symlink(workDir, target, link);
+
if (resource.getType() == LocalResourceType.ARCHIVE) {
localArchives.add(pathString);
} else if (resource.getType() == LocalResourceType.FILE) {
localFiles.add(pathString);
+ } else if (resource.getType() == LocalResourceType.PATTERN) {
+ //PATTERN is not currently used in local mode
+ throw new IllegalArgumentException("Resource type PATTERN is not " +
+ "implemented yet. " + resource.getResource());
}
Path resourcePath;
try {
@@ -175,27 +185,6 @@ class LocalDistributedCacheManager {
.arrayToString(localFiles.toArray(new String[localArchives
.size()])));
}
- if (DistributedCache.getSymlink(conf)) {
- File workDir = new File(System.getProperty("user.dir"));
- URI[] archives = DistributedCache.getCacheArchives(conf);
- URI[] files = DistributedCache.getCacheFiles(conf);
- Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
- Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
- if (archives != null) {
- for (int i = 0; i < archives.length; i++) {
- String link = archives[i].getFragment();
- String target = new File(localArchives[i].toUri()).getPath();
- symlink(workDir, target, link);
- }
- }
- if (files != null) {
- for (int i = 0; i < files.length; i++) {
- String link = files[i].getFragment();
- String target = new File(localFiles[i].toUri()).getPath();
- symlink(workDir, target, link);
- }
- }
- }
setupCalled = true;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Fri Oct 19 02:25:55 2012
@@ -383,6 +383,7 @@ public class TypeConverter {
switch (yarnApplicationState) {
case NEW:
case SUBMITTED:
+ case ACCEPTED:
return State.PREP;
case RUNNING:
return State.RUNNING;
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Fri Oct 19 02:25:55 2012
@@ -61,11 +61,6 @@ public class JHAdminConfig {
MR_HISTORY_PREFIX + "datestring.cache.size";
public static final int DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE = 200000;
- //TODO REMOVE debug-mode
- /** Equivalent to 0.20 mapreduce.jobhistory.debug.mode */
- public static final String MR_HISTORY_DEBUG_MODE =
- MR_HISTORY_PREFIX + "debug-mode";
-
/** Path where history files should be stored for DONE jobs. **/
public static final String MR_HISTORY_DONE_DIR =
MR_HISTORY_PREFIX + "done-dir";
@@ -133,5 +128,16 @@ public class JHAdminConfig {
* The HistoryStorage class to use to cache history data.
*/
public static final String MR_HISTORY_STORAGE =
- MR_HISTORY_PREFIX + ".store.class";
+ MR_HISTORY_PREFIX + "store.class";
+
+ /** Whether to use fixed ports with the minicluster. */
+ public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX
+ + "minicluster.fixed.ports";
+
+ /**
+ * Default is false to be able to run tests concurrently without port
+ * conflicts.
+ */
+ public static boolean DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS = false;
+
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Fri Oct 19 02:25:55 2012
@@ -79,6 +79,13 @@ public class JobHistoryUtils {
public static final FsPermission HISTORY_DONE_FILE_PERMISSION =
FsPermission.createImmutable((short) 0770); // rwx------
+
+ /**
+ * Umask for the done dir and derivatives.
+ */
+ public static final FsPermission HISTORY_DONE_DIR_UMASK = FsPermission
+ .createImmutable((short) (0770 ^ 0777));
+
/**
* Permissions for the intermediate done directory.
@@ -336,20 +343,19 @@ public class JobHistoryUtils {
/**
* Gets the timestamp component based on millisecond time.
* @param millisecondTime
- * @param debugMode
* @return the timestamp component based on millisecond time
*/
- public static String timestampDirectoryComponent(long millisecondTime, boolean debugMode) {
+ public static String timestampDirectoryComponent(long millisecondTime) {
Calendar timestamp = Calendar.getInstance();
timestamp.setTimeInMillis(millisecondTime);
String dateString = null;
- dateString = String.format(
- TIMESTAMP_DIR_FORMAT,
- timestamp.get(Calendar.YEAR),
- // months are 0-based in Calendar, but people will expect January
- // to be month #1.
- timestamp.get(debugMode ? Calendar.HOUR : Calendar.MONTH) + 1,
- timestamp.get(debugMode ? Calendar.MINUTE : Calendar.DAY_OF_MONTH));
+ dateString = String
+ .format(TIMESTAMP_DIR_FORMAT,
+ timestamp.get(Calendar.YEAR),
+ // months are 0-based in Calendar, but people will expect January to
+ // be month #1.
+ timestamp.get(Calendar.MONTH) + 1,
+ timestamp.get(Calendar.DAY_OF_MONTH));
dateString = dateString.intern();
return dateString;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Fri Oct 19 02:25:55 2012
@@ -35,6 +35,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -56,6 +57,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Helper class for MR applications
@@ -171,8 +173,15 @@ public class MRApps extends Apps {
}
// Add standard Hadoop classes
- for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)
- .split(",")) {
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
+ .trim());
+ }
+ for (String c : conf.getStrings(
+ MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+ MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
.trim());
}
@@ -201,7 +210,15 @@ public class MRApps extends Apps {
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
- MRJobConfig.JOB_JAR);
+ MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR);
+ Apps.addToEnvironment(
+ environment,
+ Environment.CLASSPATH.name(),
+ MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR);
+ Apps.addToEnvironment(
+ environment,
+ Environment.CLASSPATH.name(),
+ MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*");
Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
@@ -263,6 +280,13 @@ public class MRApps extends Apps {
DistributedCache.getFileClassPaths(conf));
}
+ private static String getResourceDescription(LocalResourceType type) {
+ if(type == LocalResourceType.ARCHIVE || type == LocalResourceType.PATTERN) {
+ return "cache archive (" + MRJobConfig.CACHE_ARCHIVES + ") ";
+ }
+ return "cache file (" + MRJobConfig.CACHE_FILES + ") ";
+ }
+
// TODO - Move this to MR!
// Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[],
// long[], boolean[], Path[], FileType)
@@ -308,6 +332,13 @@ public class MRApps extends Apps {
throw new IllegalArgumentException("Resource name must be relative");
}
String linkName = name.toUri().getPath();
+ LocalResource orig = localResources.get(linkName);
+ if(orig != null && !orig.getResource().equals(
+ ConverterUtils.getYarnUrlFromURI(p.toUri()))) {
+ throw new InvalidJobConfException(
+ getResourceDescription(orig.getType()) + orig.getResource() +
+ " conflicts with " + getResourceDescription(type) + u);
+ }
localResources.put(
linkName,
BuilderUtils.newLocalResource(
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Fri Oct 19 02:25:55 2012
@@ -67,7 +67,7 @@ public class MRBuilderUtils {
String userName, JobState state, long submitTime, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress,
float cleanupProgress, String jobFile, List<AMInfo> amInfos,
- boolean isUber) {
+ boolean isUber, String diagnostics) {
JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId);
report.setJobName(jobName);
@@ -83,6 +83,7 @@ public class MRBuilderUtils {
report.setJobFile(jobFile);
report.setAMInfos(amInfos);
report.setIsUber(isUber);
+ report.setDiagnostics(diagnostics);
return report;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java Fri Oct 19 02:25:55 2012
@@ -117,7 +117,8 @@ public class TestMRWithDistributedCache
TestCase.assertEquals("symlink distributed.first.symlink length not 1", 1,
symlinkFile.length());
- TestCase.assertFalse("second file should not be symlinked",
+ //This last one is a difference between MRv2 and MRv1
+ TestCase.assertTrue("second file should be symlinked too",
expectedAbsentSymlinkFile.exists());
}
}
@@ -145,7 +146,6 @@ public class TestMRWithDistributedCache
job.addFileToClassPath(second);
job.addArchiveToClassPath(third);
job.addCacheArchive(fourth.toUri());
- job.createSymlink();
job.setMaxMapAttempts(1); // speed up failures
job.submit();
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Fri Oct 19 02:25:55 2012
@@ -19,25 +19,33 @@
package org.apache.hadoop.mapreduce.v2.util;
import java.io.IOException;
+import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
-import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.junit.Test;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
public class TestMRApps {
@@ -132,11 +140,19 @@ public class TestMRApps {
Map<String, String> environment = new HashMap<String, String>();
MRApps.setClasspath(environment, job.getConfiguration());
assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
- String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
- if (confClasspath != null) {
- confClasspath = confClasspath.replaceAll(",\\s*", ":").trim();
+ String yarnAppClasspath =
+ job.getConfiguration().get(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH);
+ if (yarnAppClasspath != null) {
+ yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", ":").trim();
}
- assertTrue(environment.get("CLASSPATH").contains(confClasspath));
+ assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath));
+ String mrAppClasspath =
+ job.getConfiguration().get(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH);
+ if (mrAppClasspath != null) {
+ mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", ":").trim();
+ }
+ assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
}
@Test public void testSetClasspathWithUserPrecendence() {
@@ -150,7 +166,7 @@ public class TestMRApps {
}
String env_str = env.get("CLASSPATH");
assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
- env_str.indexOf("$PWD:job.jar"), 0);
+ env_str.indexOf("$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0);
}
@Test public void testSetClasspathWithNoUserPrecendence() {
@@ -163,8 +179,129 @@ public class TestMRApps {
fail("Got exception while setting classpath");
}
String env_str = env.get("CLASSPATH");
+ int index =
+ env_str.indexOf("job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*");
+ assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not"
+ + " in the classpath!", index, -1);
assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
- env_str.indexOf("$PWD:job.jar"), 0);
+ index, 0);
}
-
+
+ @Test
+ public void testSetupDistributedCacheEmpty() throws IOException {
+ Configuration conf = new Configuration();
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ MRApps.setupDistributedCache(conf, localResources);
+ assertTrue("Empty Config did not produce an empty list of resources",
+ localResources.isEmpty());
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test(expected = InvalidJobConfException.class)
+ public void testSetupDistributedCacheConflicts() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+
+ URI mockUri = URI.create("mockfs://mock/");
+ FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
+ .getRawFileSystem();
+
+ URI archive = new URI("mockfs://mock/tmp/something.zip#something");
+ Path archivePath = new Path(archive);
+ URI file = new URI("mockfs://mock/tmp/something.txt#something");
+ Path filePath = new Path(file);
+
+ when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
+ when(mockFs.resolvePath(filePath)).thenReturn(filePath);
+
+ DistributedCache.addCacheArchive(archive, conf);
+ conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
+ conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
+ conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
+ DistributedCache.addCacheFile(file, conf);
+ conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
+ conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
+ conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ MRApps.setupDistributedCache(conf, localResources);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test(expected = InvalidJobConfException.class)
+ public void testSetupDistributedCacheConflictsFiles() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+
+ URI mockUri = URI.create("mockfs://mock/");
+ FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
+ .getRawFileSystem();
+
+ URI file = new URI("mockfs://mock/tmp/something.zip#something");
+ Path filePath = new Path(file);
+ URI file2 = new URI("mockfs://mock/tmp/something.txt#something");
+ Path file2Path = new Path(file);
+
+ when(mockFs.resolvePath(filePath)).thenReturn(filePath);
+ when(mockFs.resolvePath(file2Path)).thenReturn(file2Path);
+
+ DistributedCache.addCacheFile(file, conf);
+ DistributedCache.addCacheFile(file2, conf);
+ conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "10,11");
+ conf.set(MRJobConfig.CACHE_FILES_SIZES, "10,11");
+ conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true,true");
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ MRApps.setupDistributedCache(conf, localResources);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testSetupDistributedCache() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+
+ URI mockUri = URI.create("mockfs://mock/");
+ FileSystem mockFs = ((FilterFileSystem)FileSystem.get(mockUri, conf))
+ .getRawFileSystem();
+
+ URI archive = new URI("mockfs://mock/tmp/something.zip");
+ Path archivePath = new Path(archive);
+ URI file = new URI("mockfs://mock/tmp/something.txt#something");
+ Path filePath = new Path(file);
+
+ when(mockFs.resolvePath(archivePath)).thenReturn(archivePath);
+ when(mockFs.resolvePath(filePath)).thenReturn(filePath);
+
+ DistributedCache.addCacheArchive(archive, conf);
+ conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, "10");
+ conf.set(MRJobConfig.CACHE_ARCHIVES_SIZES, "10");
+ conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, "true");
+ DistributedCache.addCacheFile(file, conf);
+ conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "11");
+ conf.set(MRJobConfig.CACHE_FILES_SIZES, "11");
+ conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, "true");
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ MRApps.setupDistributedCache(conf, localResources);
+ assertEquals(2, localResources.size());
+ LocalResource lr = localResources.get("something.zip");
+ assertNotNull(lr);
+ assertEquals(10l, lr.getSize());
+ assertEquals(10l, lr.getTimestamp());
+ assertEquals(LocalResourceType.ARCHIVE, lr.getType());
+ lr = localResources.get("something");
+ assertNotNull(lr);
+ assertEquals(11l, lr.getSize());
+ assertEquals(11l, lr.getTimestamp());
+ assertEquals(LocalResourceType.FILE, lr.getType());
+ }
+
+ static class MockFileSystem extends FilterFileSystem {
+ MockFileSystem() {
+ super(mock(FileSystem.class));
+ }
+ public void initialize(URI name, Configuration conf) throws IOException {}
+ }
+
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Fri Oct 19 02:25:55 2012
@@ -68,6 +68,24 @@
</executions>
</plugin>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/avro</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java Fri Oct 19 02:25:55 2012
@@ -48,8 +48,12 @@ import org.apache.hadoop.mapreduce.Job;
* Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
* Jars may be optionally added to the classpath of the tasks, a rudimentary
* software distribution mechanism. Files have execution permissions.
- * Optionally users can also direct it to symlink the distributed cache file(s)
- * into the working directory of the task.</p>
+ * In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ * to be created in the working directory of the child task. In the current
+ * version symlinks are always created. If the URL does not have a fragment
+ * the name of the file or directory will be used. If multiple files or
+ * directories map to the same link name, the last one added, will be used. All
+ * others will not even be downloaded.</p>
*
* <p><code>DistributedCache</code> tracks modification timestamps of the cache
* files. Clearly the cache files should not be modified by the application
@@ -91,8 +95,7 @@ import org.apache.hadoop.mapreduce.Job;
*
* public void configure(JobConf job) {
* // Get the cached archives/files
- * localArchives = DistributedCache.getLocalCacheArchives(job);
- * localFiles = DistributedCache.getLocalCacheFiles(job);
+ * File f = new File("./map.zip/some/file/in/zip.txt");
* }
*
* public void map(K key, V value,
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java Fri Oct 19 02:25:55 2012
@@ -340,7 +340,7 @@ public class IFile {
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
- checksumIn = new IFileInputStream(in,length);
+ checksumIn = new IFileInputStream(in,length, conf);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java Fri Oct 19 02:25:55 2012
@@ -19,13 +19,22 @@
package org.apache.hadoop.mapred;
import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.util.DataChecksum;
/**
* A checksum input stream, used for IFiles.
@@ -35,7 +44,8 @@ import org.apache.hadoop.util.DataChecks
@InterfaceStability.Unstable
public class IFileInputStream extends InputStream {
- private final InputStream in; //The input stream to be verified for checksum.
+ private final InputStream in; //The input stream to be verified for checksum.
+ private final FileDescriptor inFd; // the file descriptor, if it is known
private final long length; //The total length of the input file
private final long dataLength;
private DataChecksum sum;
@@ -43,7 +53,14 @@ public class IFileInputStream extends In
private final byte b[] = new byte[1];
private byte csum[] = null;
private int checksumSize;
-
+
+ private ReadaheadRequest curReadahead = null;
+ private ReadaheadPool raPool = ReadaheadPool.getInstance();
+ private boolean readahead;
+ private int readaheadLength;
+
+ public static final Log LOG = LogFactory.getLog(IFileInputStream.class);
+
private boolean disableChecksumValidation = false;
/**
@@ -51,13 +68,36 @@ public class IFileInputStream extends In
* @param in The input stream to be verified for checksum.
* @param len The length of the input stream including checksum bytes.
*/
- public IFileInputStream(InputStream in, long len) {
+ public IFileInputStream(InputStream in, long len, Configuration conf) {
this.in = in;
- sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
+ this.inFd = getFileDescriptorIfAvail(in);
+ sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
Integer.MAX_VALUE);
checksumSize = sum.getChecksumSize();
length = len;
dataLength = length - checksumSize;
+
+ conf = (conf != null) ? conf : new Configuration();
+ readahead = conf.getBoolean(MRConfig.MAPRED_IFILE_READAHEAD,
+ MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD);
+ readaheadLength = conf.getInt(MRConfig.MAPRED_IFILE_READAHEAD_BYTES,
+ MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD_BYTES);
+
+ doReadahead();
+ }
+
+ private static FileDescriptor getFileDescriptorIfAvail(InputStream in) {
+ FileDescriptor fd = null;
+ try {
+ if (in instanceof HasFileDescriptor) {
+ fd = ((HasFileDescriptor)in).getFileDescriptor();
+ } else if (in instanceof FileInputStream) {
+ fd = ((FileInputStream)in).getFD();
+ }
+ } catch (IOException e) {
+ LOG.info("Unable to determine FileDescriptor", e);
+ }
+ return fd;
}
/**
@@ -66,6 +106,10 @@ public class IFileInputStream extends In
*/
@Override
public void close() throws IOException {
+
+ if (curReadahead != null) {
+ curReadahead.cancel();
+ }
if (currentOffset < dataLength) {
byte[] t = new byte[Math.min((int)
(Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)];
@@ -102,10 +146,21 @@ public class IFileInputStream extends In
if (currentOffset >= dataLength) {
return -1;
}
-
+
+ doReadahead();
+
return doRead(b,off,len);
}
+ private void doReadahead() {
+ if (raPool != null && inFd != null && readahead) {
+ curReadahead = raPool.readaheadStream(
+ "ifile", inFd,
+ currentOffset, readaheadLength, dataLength,
+ curReadahead);
+ }
+ }
+
/**
* Read bytes from the stream.
* At EOF, checksum is validated and sent back
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileOutputStream.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileOutputStream.java Fri Oct 19 02:25:55 2012
@@ -49,7 +49,7 @@ public class IFileOutputStream extends F
*/
public IFileOutputStream(OutputStream out) {
super(out);
- sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
+ sum = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
Integer.MAX_VALUE);
barray = new byte[sum.getChecksumSize()];
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IndexCache.java Fri Oct 19 02:25:55 2012
@@ -67,13 +67,13 @@ class IndexCache {
if (info == null) {
info = readIndexFileToCache(fileName, mapId, expectedIndexOwner);
} else {
- while (isUnderConstruction(info)) {
- try {
- // In case the entry is ready after the above check but
- // before the following wait, we do timed wait.
- info.wait(200);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted waiting for construction", e);
+ synchronized(info) {
+ while (isUnderConstruction(info)) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for construction", e);
+ }
}
}
LOG.debug("IndexCache HIT: MapId " + mapId + " found");
@@ -101,13 +101,13 @@ class IndexCache {
IndexInformation info;
IndexInformation newInd = new IndexInformation();
if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
- while (isUnderConstruction(info)) {
- try {
- // In case the entry is ready after the above check but
- // before the following wait, we do timed wait.
- info.wait(200);
- } catch (InterruptedException e) {
- throw new IOException("Interrupted waiting for construction", e);
+ synchronized(info) {
+ while (isUnderConstruction(info)) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted waiting for construction", e);
+ }
}
}
LOG.debug("IndexCache HIT: MapId " + mapId + " found");
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Fri Oct 19 02:25:55 2012
@@ -1357,7 +1357,7 @@ public class JobConf extends Configurati
* @return the maximum no. of failures of a given job per tasktracker.
*/
public int getMaxTaskFailuresPerTracker() {
- return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 4);
+ return getInt(JobContext.MAX_TASK_FAILURES_PER_TRACKER, 3);
}
/**
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueClient.java Fri Oct 19 02:25:55 2012
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -184,7 +185,7 @@ class JobQueueClient extends Configured
printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
if (showJobs && (jobQueueInfo.getChildren() == null ||
jobQueueInfo.getChildren().size() == 0)) {
- JobStatus[] jobs = jc.getJobsFromQueue(queue);
+ JobStatus[] jobs = jobQueueInfo.getJobStatuses();
if (jobs == null)
jobs = new JobStatus[0];
jc.displayJobList(jobs);
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Fri Oct 19 02:25:55 2012
@@ -238,7 +238,7 @@ public class JobStatus extends org.apach
stat.getSetupProgress(), stat.getMapProgress(), stat.getReduceProgress(),
stat.getCleanupProgress(), stat.getState().getValue(),
JobPriority.valueOf(stat.getPriority().name()),
- stat.getUsername(), stat.getJobName(), stat.getJobFile(),
+ stat.getUsername(), stat.getJobName(), stat.getQueue(), stat.getJobFile(),
stat.getTrackingUrl(), stat.isUber());
old.setStartTime(stat.getStartTime());
old.setFinishTime(stat.getFinishTime());
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Fri Oct 19 02:25:55 2012
@@ -424,6 +424,7 @@ class MapTask extends Task {
job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
}
+ LOG.info("Processing split: " + inputSplit);
}
static class NewTrackingRecordReader<K,V>
@@ -694,6 +695,7 @@ class MapTask extends Task {
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
+ LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java Fri Oct 19 02:25:55 2012
@@ -66,14 +66,6 @@ public class TaskLog {
// localFS is set in (and used by) writeToIndexFile()
static LocalFileSystem localFS = null;
- static {
- if (!LOG_DIR.exists()) {
- boolean b = LOG_DIR.mkdirs();
- if (!b) {
- LOG.debug("mkdirs failed. Ignoring.");
- }
- }
- }
public static String getMRv2LogDir() {
return System.getProperty(MRJobConfig.TASK_LOG_DIR);
@@ -638,6 +630,12 @@ public class TaskLog {
* @return base log directory
*/
static File getUserLogDir() {
+ if (!LOG_DIR.exists()) {
+ boolean b = LOG_DIR.mkdirs();
+ if (!b) {
+ LOG.debug("mkdirs failed. Ignoring.");
+ }
+ }
return LOG_DIR;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TotalOrderPartitioner.java Fri Oct 19 02:25:55 2012
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.Partitio
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public class TotalOrderPartitioner<K extends WritableComparable<?>,V>
+public class TotalOrderPartitioner<K ,V>
extends org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner<K, V>
implements Partitioner<K,V> {
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java Fri Oct 19 02:25:55 2012
@@ -313,7 +313,6 @@ public class Submitter extends Configure
// add default debug script only when executable is expressed as
// <path>#<executable>
if (exec.contains("#")) {
- DistributedCache.createSymlink(conf);
// set default gdb commands for map and reduce task
String defScript = "$HADOOP_PREFIX/src/c++/pipes/debug/pipes-default-script";
setIfUnset(conf, MRJobConfig.MAP_DEBUG_SCRIPT,defScript);
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java Fri Oct 19 02:25:55 2012
@@ -131,6 +131,8 @@ public class Job extends JobContextImpl
Job(JobConf conf) throws IOException {
super(conf, null);
+ // propagate existing user credentials to job
+ this.credentials.mergeAll(this.ugi.getCredentials());
this.cluster = null;
}
@@ -1049,9 +1051,10 @@ public class Job extends JobContextImpl
}
/**
- * This method allows you to create symlinks in the current working directory
- * of the task to all the cache files/archives
+ * Originally intended to enable symlinks, but currently symlinks cannot be
+ * disabled.
*/
+ @Deprecated
public void createSymlink() {
ensureState(JobState.DEFINE);
DistributedCache.createSymlink(conf);
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobContext.java Fri Oct 19 02:25:55 2012
@@ -221,10 +221,11 @@ public interface JobContext extends MRJo
public String getUser();
/**
- * This method checks to see if symlinks are to be create for the
- * localized cache files in the current working directory
- * @return true if symlinks are to be created- else return false
+ * Originally intended to check if symlinks should be used, but currently
+ * symlinks cannot be disabled.
+ * @return true
*/
+ @Deprecated
public boolean getSymlink();
/**
@@ -251,14 +252,22 @@ public interface JobContext extends MRJo
* Return the path array of the localized caches
* @return A path array of localized caches
* @throws IOException
+ * @deprecated the array returned only includes the items the were
+ * downloaded. There is no way to map this to what is returned by
+ * {@link #getCacheArchives()}.
*/
+ @Deprecated
public Path[] getLocalCacheArchives() throws IOException;
/**
* Return the path array of the localized files
* @return A path array of localized files
* @throws IOException
+ * @deprecated the array returned only includes the items the were
+ * downloaded. There is no way to map this to what is returned by
+ * {@link #getCacheFiles()}.
*/
+ @Deprecated
public Path[] getLocalCacheFiles() throws IOException;
/**
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Fri Oct 19 02:25:55 2012
@@ -135,8 +135,9 @@ class JobSubmitter {
short replication) throws IOException {
Configuration conf = job.getConfiguration();
if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
- LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
- "Applications should implement Tool for the same.");
+ LOG.warn("Hadoop command-line option parsing not performed. " +
+ "Implement the Tool interface and execute your application " +
+ "with ToolRunner to remedy this.");
}
// get all the command line arguments passed in by the user conf
@@ -189,7 +190,6 @@ class JobSubmitter {
//should not throw a uri exception
throw new IOException("Failed to create uri for " + tmpFile, ue);
}
- DistributedCache.createSymlink(conf);
}
}
@@ -224,7 +224,6 @@ class JobSubmitter {
//should not throw an uri excpetion
throw new IOException("Failed to create uri for " + tmpArchives, ue);
}
- DistributedCache.createSymlink(conf);
}
}
@@ -233,9 +232,17 @@ class JobSubmitter {
if ("".equals(job.getJobName())){
job.setJobName(new Path(jobJar).getName());
}
- copyJar(new Path(jobJar), JobSubmissionFiles.getJobJar(submitJobDir),
- replication);
- job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+ Path jobJarPath = new Path(jobJar);
+ URI jobJarURI = jobJarPath.toUri();
+ // If the job jar is already in fs, we don't need to copy it from local fs
+ if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null
+ || !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme())
+ && jobJarURI.getAuthority().equals(
+ jtFs.getUri().getAuthority()))) {
+ copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
+ replication);
+ job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
+ }
} else {
LOG.warn("No job jar file set. User classes may not be found. "+
"See Job or Job#setJar(String).");
@@ -428,13 +435,9 @@ class JobSubmitter {
private void printTokens(JobID jobId,
Credentials credentials) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Printing tokens for job: " + jobId);
- for(Token<?> token: credentials.getAllTokens()) {
- if (token.getKind().toString().equals("HDFS_DELEGATION_TOKEN")) {
- LOG.debug("Submitting with " + token);
- }
- }
+ LOG.info("Submitting tokens for job: " + jobId);
+ for (Token<?> token: credentials.getAllTokens()) {
+ LOG.info(token);
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Fri Oct 19 02:25:55 2012
@@ -79,4 +79,25 @@ public interface MRConfig {
public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
public static final String MAX_BLOCK_LOCATIONS_KEY =
"mapreduce.job.max.split.locations";
-}
+
+ public static final String SHUFFLE_SSL_ENABLED_KEY =
+ "mapreduce.shuffle.ssl.enabled";
+
+ public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
+
+ /**
+ * Configuration key to enable/disable IFile readahead.
+ */
+ public static final String MAPRED_IFILE_READAHEAD =
+ "mapreduce.ifile.readahead";
+
+ public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+ /**
+ * Configuration key to set the IFile readahead length in bytes.
+ */
+ public static final String MAPRED_IFILE_READAHEAD_BYTES =
+ "mapreduce.ifile.readahead.bytes";
+
+ public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+ 4 * 1024 * 1024;}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Oct 19 02:25:55 2012
@@ -114,6 +114,10 @@ public interface MRJobConfig {
public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
+ /**
+ * @deprecated Symlinks are always on and cannot be disabled.
+ */
+ @Deprecated
public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";
public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";
@@ -583,4 +587,18 @@ public interface MRJobConfig {
MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT =
"security.job.client.protocol.acl";
+ /**
+ * CLASSPATH for all YARN MapReduce applications.
+ */
+ public static final String MAPREDUCE_APPLICATION_CLASSPATH =
+ "mapreduce.application.classpath";
+
+ /**
+ * Default CLASSPATH for all YARN MapReduce applications.
+ */
+ public static final String[] DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH = {
+ "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*",
+ "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
+ };
+
}