You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by at...@apache.org on 2011/11/02 06:35:03 UTC
svn commit: r1196458 [5/19] - in
/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/
bin/ conf/ dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/
hadoop-mapreduce-client/hadoop-mapreduce-cli...
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Wed Nov 2 05:34:31 2011
@@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.hadoop.mapreduce.FileS
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@@ -51,12 +53,14 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
public class MockJobs extends MockApps {
@@ -85,6 +89,10 @@ public class MockJobs extends MockApps {
static final Iterator<String> DIAGS = Iterators.cycle(
"Error: java.lang.OutOfMemoryError: Java heap space",
"Lost task tracker: tasktracker.domain/127.0.0.1:40879");
+
+ public static final String NM_HOST = "localhost";
+ public static final int NM_PORT = 1234;
+ public static final int NM_HTTP_PORT = 9999;
static final int DT = 1000000; // ms
@@ -271,6 +279,11 @@ public class MockJobs extends MockApps {
public long getSortFinishTime() {
return 0;
}
+
+ @Override
+ public String getNodeRackName() {
+ return "/default-rack";
+ }
};
}
@@ -488,6 +501,23 @@ public class MockJobs extends MockApps {
public Map<JobACL, AccessControlList> getJobACLs() {
return Collections.<JobACL, AccessControlList>emptyMap();
}
+
+ @Override
+ public List<AMInfo> getAMInfos() {
+ List<AMInfo> amInfoList = new LinkedList<AMInfo>();
+ amInfoList.add(createAMInfo(1));
+ amInfoList.add(createAMInfo(2));
+ return amInfoList;
+ }
};
}
+
+ private static AMInfo createAMInfo(int attempt) {
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(100, 1), attempt);
+ ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+ return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(),
+ containerId, NM_HOST, NM_PORT, NM_HTTP_PORT);
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Wed Nov 2 05:34:31 2011
@@ -219,7 +219,7 @@ public class TestFail {
}
@Override
- protected ContainerManager getCMProxy(ContainerId containerID,
+ protected ContainerManager getCMProxy(ContainerId contianerID,
String containerManagerBindAddr, ContainerToken containerToken)
throws IOException {
try {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Wed Nov 2 05:34:31 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.MRJob
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
@@ -36,11 +37,15 @@ public class TestMRAppMaster {
public void testMRAppMasterForDifferentUser() throws IOException,
InterruptedException {
String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
+ String containerIdStr = "container_1317529182569_0004_000001_1";
String stagingDir = "/tmp/staging";
String userName = "TestAppMasterUser";
ApplicationAttemptId applicationAttemptId = ConverterUtils
.toApplicationAttemptId(applicationAttemptIdStr);
- MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId);
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ MRAppMasterTest appMaster =
+ new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+ System.currentTimeMillis());
YarnConfiguration conf = new YarnConfiguration();
conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
@@ -54,8 +59,10 @@ class MRAppMasterTest extends MRAppMaste
Path stagingDirPath;
private Configuration conf;
- public MRAppMasterTest(ApplicationAttemptId applicationAttemptId) {
- super(applicationAttemptId);
+ public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String host, int port, int httpPort,
+ long submitTime) {
+ super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRClientService.java Wed Nov 2 05:34:31 2011
@@ -32,13 +32,15 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
-import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.client.MRClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
@@ -49,10 +51,8 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.junit.Test;
@@ -83,7 +83,6 @@ public class TestMRClientService {
TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
taskAttemptStatus.id = attempt.getID();
taskAttemptStatus.progress = 0.5f;
- taskAttemptStatus.diagnosticInfo = diagnostic2;
taskAttemptStatus.stateString = "RUNNING";
taskAttemptStatus.taskState = TaskAttemptState.RUNNING;
taskAttemptStatus.phase = Phase.MAP;
@@ -107,8 +106,9 @@ public class TestMRClientService {
GetJobReportRequest gjrRequest =
recordFactory.newRecordInstance(GetJobReportRequest.class);
gjrRequest.setJobId(job.getID());
- Assert.assertNotNull("JobReport is null",
- proxy.getJobReport(gjrRequest).getJobReport());
+ JobReport jr = proxy.getJobReport(gjrRequest).getJobReport();
+ verifyJobReport(jr);
+
GetTaskAttemptCompletionEventsRequest gtaceRequest =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
@@ -127,8 +127,10 @@ public class TestMRClientService {
GetTaskAttemptReportRequest gtarRequest =
recordFactory.newRecordInstance(GetTaskAttemptReportRequest.class);
gtarRequest.setTaskAttemptId(attempt.getID());
- Assert.assertNotNull("TaskAttemptReport is null",
- proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport());
+ TaskAttemptReport tar =
+ proxy.getTaskAttemptReport(gtarRequest).getTaskAttemptReport();
+ verifyTaskAttemptReport(tar);
+
GetTaskReportRequest gtrRequest =
recordFactory.newRecordInstance(GetTaskReportRequest.class);
@@ -151,14 +153,12 @@ public class TestMRClientService {
proxy.getTaskReports(gtreportsRequest).getTaskReportList());
List<String> diag = proxy.getDiagnostics(gdRequest).getDiagnosticsList();
- Assert.assertEquals("Num diagnostics not correct", 2 , diag.size());
+ Assert.assertEquals("Num diagnostics not correct", 1 , diag.size());
Assert.assertEquals("Diag 1 not correct",
diagnostic1, diag.get(0).toString());
- Assert.assertEquals("Diag 2 not correct",
- diagnostic2, diag.get(1).toString());
TaskReport taskReport = proxy.getTaskReport(gtrRequest).getTaskReport();
- Assert.assertEquals("Num diagnostics not correct", 2,
+ Assert.assertEquals("Num diagnostics not correct", 1,
taskReport.getDiagnosticsCount());
//send the done signal to the task
@@ -170,6 +170,31 @@ public class TestMRClientService {
app.waitForState(job, JobState.SUCCEEDED);
}
+ private void verifyJobReport(JobReport jr) {
+ Assert.assertNotNull("JobReport is null", jr);
+ List<AMInfo> amInfos = jr.getAMInfos();
+ Assert.assertEquals(1, amInfos.size());
+ Assert.assertEquals(JobState.RUNNING, jr.getJobState());
+ AMInfo amInfo = amInfos.get(0);
+ Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+ Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+ Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+ Assert.assertEquals(1, amInfo.getAppAttemptId().getAttemptId());
+ Assert.assertEquals(1, amInfo.getContainerId().getApplicationAttemptId()
+ .getAttemptId());
+ Assert.assertTrue(amInfo.getStartTime() > 0);
+ }
+
+ private void verifyTaskAttemptReport(TaskAttemptReport tar) {
+ Assert.assertEquals(TaskAttemptState.RUNNING, tar.getTaskAttemptState());
+ Assert.assertNotNull("TaskAttemptReport is null", tar);
+ Assert.assertEquals(MRApp.NM_HOST, tar.getNodeManagerHost());
+ Assert.assertEquals(MRApp.NM_PORT, tar.getNodeManagerPort());
+ Assert.assertEquals(MRApp.NM_HTTP_PORT, tar.getNodeManagerHttpPort());
+ Assert.assertEquals(1, tar.getContainerId().getApplicationAttemptId()
+ .getAttemptId());
+ }
+
class MRAppWithClientService extends MRApp {
MRClientService clientService = null;
MRAppWithClientService(int maps, int reduces, boolean autoComplete) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Wed Nov 2 05:34:31 2011
@@ -34,6 +34,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -44,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -115,8 +117,8 @@ public class TestRMContainerAllocator {
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
- MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
- 0, 0, 0, 0, 0, 0, "jobfile"));
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -192,8 +194,8 @@ public class TestRMContainerAllocator {
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
- MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
- 0, 0, 0, 0, 0, 0, "jobfile"));
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -258,8 +260,8 @@ public class TestRMContainerAllocator {
JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
Job mockJob = mock(Job.class);
when(mockJob.getReport()).thenReturn(
- MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
- 0, 0, 0, 0, 0, 0, "jobfile"));
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null));
MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
appAttemptId, mockJob);
@@ -340,10 +342,10 @@ public class TestRMContainerAllocator {
public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf,
int numMaps, int numReduces) {
- super(appAttemptID, conf, null, null, null, null, null, null, null,
- null);
- this.jobId = MRBuilderUtils
- .newJobId(appAttemptID.getApplicationId(), 0);
+ super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0),
+ appAttemptID, conf, null, null, null, null, null, null, null, null,
+ true, null, System.currentTimeMillis(), null);
+ this.jobId = getID();
this.numMaps = numMaps;
this.numReduces = numReduces;
}
@@ -372,8 +374,8 @@ public class TestRMContainerAllocator {
@Override
public JobReport getReport() {
return MRBuilderUtils.newJobReport(this.jobId, "job", "user",
- JobState.RUNNING, 0, 0, this.setupProgress, this.mapProgress,
- this.reduceProgress, this.cleanupProgress, "jobfile");
+ JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress,
+ this.reduceProgress, this.cleanupProgress, "jobfile", null);
}
}
@@ -478,6 +480,106 @@ public class TestRMContainerAllocator {
Assert.assertEquals(100.0f, app.getProgress(), 0.0);
}
+ @Test
+ public void testBlackListedNodes() throws Exception {
+
+ LOG.info("Running testBlackListedNodes");
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+ conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null));
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob);
+
+ // add resources to scheduler
+ MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+ MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+ MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+ dispatcher.await();
+
+ // create the container request
+ ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+ new String[] { "h1" });
+ allocator.sendRequest(event1);
+
+ // send 1 more request with different resource req
+ ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+ new String[] { "h2" });
+ allocator.sendRequest(event2);
+
+ // send another request with different resource and priority
+ ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+ new String[] { "h3" });
+ allocator.sendRequest(event3);
+
+ // this tells the scheduler about the requests
+ // as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // Send events to blacklist nodes h1 and h2
+ ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
+ allocator.sendFailure(f1);
+ ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);
+ allocator.sendFailure(f2);
+
+ // update resources in scheduler
+ nodeManager1.nodeHeartbeat(true); // Node heartbeat
+ nodeManager2.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // mark h1/h2 as bad nodes
+ nodeManager1.nodeHeartbeat(false);
+ nodeManager2.nodeHeartbeat(false);
+ dispatcher.await();
+
+ assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ nodeManager3.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+ assigned = allocator.schedule();
+ dispatcher.await();
+
+ Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
+
+ // validate that all containers are assigned to h3
+ for (TaskAttemptContainerAssignedEvent assig : assigned) {
+ Assert.assertTrue("Assigned container host not correct", "h3".equals(assig
+ .getContainer().getNodeId().getHost()));
+ }
+ }
+
private static class MyFifoScheduler extends FifoScheduler {
public MyFifoScheduler(RMContext rmContext) {
@@ -534,6 +636,19 @@ public class TestRMContainerAllocator {
new String[] { NetworkTopology.DEFAULT_RACK });
}
+ private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId,
+ String host, boolean reduce) {
+ TaskId taskId;
+ if (reduce) {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+ } else {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ }
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+ taskAttemptId);
+ return new ContainerFailedEvent(attemptId, host);
+ }
+
private void checkAssignments(ContainerRequestEvent[] requests,
List<TaskAttemptContainerAssignedEvent> assignments,
boolean checkHostMatch) {
@@ -653,6 +768,10 @@ public class TestRMContainerAllocator {
}
}
+ public void sendFailure(ContainerFailedEvent f) {
+ super.handle(f);
+ }
+
// API to be used by tests
public List<TaskAttemptContainerAssignedEvent> schedule() {
// run the scheduler
@@ -672,6 +791,7 @@ public class TestRMContainerAllocator {
protected void startAllocatorThread() {
// override to NOT start thread
}
+
}
public static void main(String[] args) throws Exception {
@@ -681,5 +801,7 @@ public class TestRMContainerAllocator {
t.testMapReduceScheduling();
t.testReportedAppProgress();
t.testReportedAppProgressWithOnlyMaps();
+ t.testBlackListedNodes();
}
+
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Wed Nov 2 05:34:31 2011
@@ -18,6 +18,9 @@
package org.apache.hadoop.mapreduce.v2.app;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.util.Iterator;
import junit.framework.Assert;
@@ -25,9 +28,21 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
@@ -36,19 +51,35 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
public class TestRecovery {
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
+ private static Path outputDir = new Path(new File("target",
+ TestRecovery.class.getName()).getAbsolutePath() +
+ Path.SEPARATOR + "out");
+ private static String partFile = "part-r-00000";
+ private Text key1 = new Text("key1");
+ private Text key2 = new Text("key2");
+ private Text val1 = new Text("val1");
+ private Text val2 = new Text("val2");
+
@Test
public void testCrashed() throws Exception {
+
int runCount = 0;
+ long am1StartTimeEst = System.currentTimeMillis();
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
long jobStartTime = job.getReport().getStartTime();
@@ -126,12 +157,16 @@ public class TestRecovery {
//stop the app
app.stop();
-
+
//rerun
//in rerun the 1st map will be recovered from previous run
+ long am2StartTimeEst = System.currentTimeMillis();
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
@@ -178,8 +213,194 @@ public class TestRecovery {
task1StartTime, mapTask1.getReport().getStartTime());
Assert.assertEquals("Task Finish time not correct",
task1FinishTime, mapTask1.getReport().getFinishTime());
+ Assert.assertEquals(2, job.getAMInfos().size());
+ int attemptNum = 1;
+ // Verify AMInfo
+ for (AMInfo amInfo : job.getAMInfos()) {
+ Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
+ .getAttemptId());
+ Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
+ .getApplicationAttemptId());
+ Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+ Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+ Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+ }
+ long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
+ long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
+ Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
+ && am1StartTimeReal <= am2StartTimeEst);
+ Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
+ && am2StartTimeReal <= System.currentTimeMillis());
+ // TODO Add verification of additional data from jobHistory - whatever was
+ // available in the failed attempt should be available here
}
+ @Test
+ public void testOutputRecovery() throws Exception {
+ int runCount = 0;
+ MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
+ true, ++runCount);
+ Configuration conf = new Configuration();
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ Job job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask1 = it.next();
+ Task reduceTask1 = it.next();
+
+ // all maps must be running
+ app.waitForState(mapTask1, TaskState.RUNNING);
+
+ TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
+ .next();
+
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+
+ //send the done signal to the map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ task1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for map task to complete
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ app.waitForState(reduceTask1, TaskState.RUNNING);
+ TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
+
+ // write output corresponding to reduce1
+ writeOutput(reduce1Attempt1, conf);
+
+ //send the done signal to the 1st reduce
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce1Attempt1.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait for first reduce task to complete
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ //stop the app before the job completes.
+ app.stop();
+
+ //rerun
+ //in rerun the map will be recovered from previous run
+ app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
+ ++runCount);
+ conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ conf.setBoolean("mapred.mapper.new-api", true);
+ conf.setBoolean("mapred.reducer.new-api", true);
+ conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+ conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+ job = app.submit(conf);
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("No of tasks not correct",
+ 3, job.getTasks().size());
+ it = job.getTasks().values().iterator();
+ mapTask1 = it.next();
+ reduceTask1 = it.next();
+ Task reduceTask2 = it.next();
+
+ // map will be recovered, no need to send done
+ app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+ // Verify the shuffle-port after recovery
+ task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
+ Assert.assertEquals(5467, task1Attempt1.getShufflePort());
+
+ // first reduce will be recovered, no need to send done
+ app.waitForState(reduceTask1, TaskState.SUCCEEDED);
+
+ app.waitForState(reduceTask2, TaskState.RUNNING);
+
+ TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
+ .iterator().next();
+ //before sending the TA_DONE, event make sure attempt has come to
+ //RUNNING state
+ app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
+
+ //send the done signal to the 2nd reduce task
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ reduce2Attempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //wait to get it completed
+ app.waitForState(reduceTask2, TaskState.SUCCEEDED);
+
+ app.waitForState(job, JobState.SUCCEEDED);
+ app.verifyCompleted();
+ validateOutput();
+ }
+
+ private void writeOutput(TaskAttempt attempt, Configuration conf)
+ throws Exception {
+ TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attempt.getID()));
+
+ TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
+ RecordWriter theRecordWriter = theOutputFormat
+ .getRecordWriter(tContext);
+
+ NullWritable nullWritable = NullWritable.get();
+ try {
+ theRecordWriter.write(key1, val1);
+ theRecordWriter.write(null, nullWritable);
+ theRecordWriter.write(null, val1);
+ theRecordWriter.write(nullWritable, val2);
+ theRecordWriter.write(key2, nullWritable);
+ theRecordWriter.write(key1, null);
+ theRecordWriter.write(null, null);
+ theRecordWriter.write(key2, val2);
+ } finally {
+ theRecordWriter.close(tContext);
+ }
+
+ OutputFormat outputFormat = ReflectionUtils.newInstance(
+ tContext.getOutputFormatClass(), conf);
+ OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
+ committer.commitTask(tContext);
+ }
+
+ private void validateOutput() throws IOException {
+ File expectedFile = new File(new Path(outputDir, partFile).toString());
+ StringBuffer expectedOutput = new StringBuffer();
+ expectedOutput.append(key1).append('\t').append(val1).append("\n");
+ expectedOutput.append(val1).append("\n");
+ expectedOutput.append(val2).append("\n");
+ expectedOutput.append(key2).append("\n");
+ expectedOutput.append(key1).append("\n");
+ expectedOutput.append(key2).append('\t').append(val2).append("\n");
+ String output = slurp(expectedFile);
+ Assert.assertEquals(output, expectedOutput.toString());
+ }
+
+ public static String slurp(File f) throws IOException {
+ int len = (int) f.length();
+ byte[] buf = new byte[len];
+ FileInputStream in = new FileInputStream(f);
+ String contents = null;
+ try {
+ in.read(buf, 0, len);
+ contents = new String(buf, "UTF-8");
+ } finally {
+ in.close();
+ }
+ return contents;
+ }
+
+
class MRAppWithHistory extends MRApp {
public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {
@@ -187,6 +408,13 @@ public class TestRecovery {
}
@Override
+ protected ContainerLauncher createContainerLauncher(AppContext context) {
+ MockContainerLauncher launcher = new MockContainerLauncher();
+ launcher.shufflePort = 5467;
+ return launcher;
+ }
+
+ @Override
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Wed Nov 2 05:34:31 2011
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
@@ -473,6 +474,11 @@ public class TestRuntimeEstimators {
public Map<JobACL, AccessControlList> getJobACLs() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public List<AMInfo> getAMInfos() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
/*
@@ -682,6 +688,11 @@ public class TestRuntimeEstimators {
public String getNodeHttpAddress() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public String getNodeRackName() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
@Override
public long getLaunchTime() {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml Wed Nov 2 05:34:31 2011
@@ -16,16 +16,17 @@
<parent>
<artifactId>hadoop-mapreduce-client</artifactId>
<groupId>org.apache.hadoop</groupId>
- <version>${hadoop-mapreduce.version}</version>
+ <version>0.24.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>0.24.0-SNAPSHOT</version>
<name>hadoop-mapreduce-client-common</name>
<properties>
- <install.file>${project.artifact.file}</install.file>
- <mr.basedir>${project.parent.parent.basedir}</mr.basedir>
+ <!-- Needed for generating FindBugs warnings using parent pom -->
+ <mr.basedir>${project.parent.basedir}/../</mr.basedir>
</properties>
<dependencies>
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Wed Nov 2 05:34:31 2011
@@ -380,6 +380,7 @@ public class TypeConverter {
public static JobStatus.State fromYarn(YarnApplicationState state) {
switch (state) {
+ case NEW:
case SUBMITTED:
return State.PREP;
case RUNNING:
@@ -425,6 +426,11 @@ public class TypeConverter {
jobStatus.setSchedulingInfo(trackingUrl); // Set AM tracking url
jobStatus.setStartTime(application.getStartTime());
jobStatus.setFailureInfo(application.getDiagnostics());
+ jobStatus.setNeededMem(application.getApplicationResourceUsageReport().getNeededResources().getMemory());
+ jobStatus.setNumReservedSlots(application.getApplicationResourceUsageReport().getNumReservedContainers());
+ jobStatus.setNumUsedSlots(application.getApplicationResourceUsageReport().getNumUsedContainers());
+ jobStatus.setReservedMem(application.getApplicationResourceUsageReport().getReservedResources().getMemory());
+ jobStatus.setUsedMem(application.getApplicationResourceUsageReport().getUsedResources().getMemory());
return jobStatus;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobReport.java Wed Nov 2 05:34:31 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.api.records;
+import java.util.List;
+
public interface JobReport {
public abstract JobId getJobId();
public abstract JobState getJobState();
@@ -25,6 +27,7 @@ public interface JobReport {
public abstract float getReduceProgress();
public abstract float getCleanupProgress();
public abstract float getSetupProgress();
+ public abstract long getSubmitTime();
public abstract long getStartTime();
public abstract long getFinishTime();
public abstract String getUser();
@@ -32,6 +35,7 @@ public interface JobReport {
public abstract String getTrackingUrl();
public abstract String getDiagnostics();
public abstract String getJobFile();
+ public abstract List<AMInfo> getAMInfos();
public abstract void setJobId(JobId jobId);
public abstract void setJobState(JobState jobState);
@@ -39,6 +43,7 @@ public interface JobReport {
public abstract void setReduceProgress(float progress);
public abstract void setCleanupProgress(float progress);
public abstract void setSetupProgress(float progress);
+ public abstract void setSubmitTime(long submitTime);
public abstract void setStartTime(long startTime);
public abstract void setFinishTime(long finishTime);
public abstract void setUser(String user);
@@ -46,4 +51,5 @@ public interface JobReport {
public abstract void setTrackingUrl(String trackingUrl);
public abstract void setDiagnostics(String diagnostics);
public abstract void setJobFile(String jobFile);
+ public abstract void setAMInfos(List<AMInfo> amInfos);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptReport.java Wed Nov 2 05:34:31 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.api.records;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
public interface TaskAttemptReport {
public abstract TaskAttemptId getTaskAttemptId();
public abstract TaskAttemptState getTaskAttemptState();
@@ -32,6 +34,10 @@ public interface TaskAttemptReport {
public abstract String getDiagnosticInfo();
public abstract String getStateString();
public abstract Phase getPhase();
+ public abstract String getNodeManagerHost();
+ public abstract int getNodeManagerPort();
+ public abstract int getNodeManagerHttpPort();
+ public abstract ContainerId getContainerId();
public abstract void setTaskAttemptId(TaskAttemptId taskAttemptId);
public abstract void setTaskAttemptState(TaskAttemptState taskAttemptState);
@@ -42,6 +48,10 @@ public interface TaskAttemptReport {
public abstract void setDiagnosticInfo(String diagnosticInfo);
public abstract void setStateString(String stateString);
public abstract void setPhase(Phase phase);
+ public abstract void setNodeManagerHost(String nmHost);
+ public abstract void setNodeManagerPort(int nmPort);
+ public abstract void setNodeManagerHttpPort(int nmHttpPort);
+ public abstract void setContainerId(ContainerId containerId);
/**
* Set the shuffle finish time. Applicable only for reduce attempts
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/JobReportPBImpl.java Wed Nov 2 05:34:31 2011
@@ -19,9 +19,14 @@
package org.apache.hadoop.mapreduce.v2.api.records.impl.pb;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.proto.MRProtos.AMInfoProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobIdProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProto;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.JobReportProtoOrBuilder;
@@ -31,12 +36,14 @@ import org.apache.hadoop.yarn.api.record
-public class JobReportPBImpl extends ProtoBase<JobReportProto> implements JobReport {
+public class JobReportPBImpl extends ProtoBase<JobReportProto> implements
+ JobReport {
JobReportProto proto = JobReportProto.getDefaultInstance();
JobReportProto.Builder builder = null;
boolean viaProto = false;
private JobId jobId = null;
+ private List<AMInfo> amInfos = null;
public JobReportPBImpl() {
@@ -48,20 +55,23 @@ public class JobReportPBImpl extends Pro
viaProto = true;
}
- public JobReportProto getProto() {
+ public synchronized JobReportProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
- private void mergeLocalToBuilder() {
+ private synchronized void mergeLocalToBuilder() {
if (this.jobId != null) {
builder.setJobId(convertToProtoFormat(this.jobId));
}
+ if (this.amInfos != null) {
+ addAMInfosToProto();
+ }
}
- private void mergeLocalToProto() {
+ private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
@@ -69,7 +79,7 @@ public class JobReportPBImpl extends Pro
viaProto = true;
}
- private void maybeInitBuilder() {
+ private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = JobReportProto.newBuilder(proto);
}
@@ -78,7 +88,7 @@ public class JobReportPBImpl extends Pro
@Override
- public JobId getJobId() {
+ public synchronized JobId getJobId() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
if (this.jobId != null) {
return this.jobId;
@@ -91,14 +101,14 @@ public class JobReportPBImpl extends Pro
}
@Override
- public void setJobId(JobId jobId) {
+ public synchronized void setJobId(JobId jobId) {
maybeInitBuilder();
if (jobId == null)
builder.clearJobId();
this.jobId = jobId;
}
@Override
- public JobState getJobState() {
+ public synchronized JobState getJobState() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasJobState()) {
return null;
@@ -107,7 +117,7 @@ public class JobReportPBImpl extends Pro
}
@Override
- public void setJobState(JobState jobState) {
+ public synchronized void setJobState(JobState jobState) {
maybeInitBuilder();
if (jobState == null) {
builder.clearJobState();
@@ -116,132 +126,197 @@ public class JobReportPBImpl extends Pro
builder.setJobState(convertToProtoFormat(jobState));
}
@Override
- public float getMapProgress() {
+ public synchronized float getMapProgress() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getMapProgress());
}
@Override
- public void setMapProgress(float mapProgress) {
+ public synchronized void setMapProgress(float mapProgress) {
maybeInitBuilder();
builder.setMapProgress((mapProgress));
}
@Override
- public float getReduceProgress() {
+ public synchronized float getReduceProgress() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getReduceProgress());
}
@Override
- public void setReduceProgress(float reduceProgress) {
+ public synchronized void setReduceProgress(float reduceProgress) {
maybeInitBuilder();
builder.setReduceProgress((reduceProgress));
}
@Override
- public float getCleanupProgress() {
+ public synchronized float getCleanupProgress() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getCleanupProgress());
}
@Override
- public void setCleanupProgress(float cleanupProgress) {
+ public synchronized void setCleanupProgress(float cleanupProgress) {
maybeInitBuilder();
builder.setCleanupProgress((cleanupProgress));
}
@Override
- public float getSetupProgress() {
+ public synchronized float getSetupProgress() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getSetupProgress());
}
@Override
- public void setSetupProgress(float setupProgress) {
+ public synchronized void setSetupProgress(float setupProgress) {
maybeInitBuilder();
builder.setSetupProgress((setupProgress));
}
+
+ @Override
+ public synchronized long getSubmitTime() {
+ JobReportProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.getSubmitTime());
+ }
+
+ @Override
+ public synchronized void setSubmitTime(long submitTime) {
+ maybeInitBuilder();
+ builder.setSubmitTime((submitTime));
+ }
+
@Override
- public long getStartTime() {
+ public synchronized long getStartTime() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getStartTime());
}
@Override
- public void setStartTime(long startTime) {
+ public synchronized void setStartTime(long startTime) {
maybeInitBuilder();
builder.setStartTime((startTime));
}
@Override
- public long getFinishTime() {
+ public synchronized long getFinishTime() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getFinishTime());
}
@Override
- public void setFinishTime(long finishTime) {
+ public synchronized void setFinishTime(long finishTime) {
maybeInitBuilder();
builder.setFinishTime((finishTime));
}
@Override
- public String getUser() {
+ public synchronized String getUser() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getUser());
}
@Override
- public void setUser(String user) {
+ public synchronized void setUser(String user) {
maybeInitBuilder();
builder.setUser((user));
}
@Override
- public String getJobName() {
+ public synchronized String getJobName() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getJobName());
}
@Override
- public void setJobName(String jobName) {
+ public synchronized void setJobName(String jobName) {
maybeInitBuilder();
builder.setJobName((jobName));
}
@Override
- public String getTrackingUrl() {
+ public synchronized String getTrackingUrl() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return (p.getTrackingUrl());
}
@Override
- public void setTrackingUrl(String trackingUrl) {
+ public synchronized void setTrackingUrl(String trackingUrl) {
maybeInitBuilder();
builder.setTrackingUrl(trackingUrl);
}
@Override
- public String getDiagnostics() {
+ public synchronized String getDiagnostics() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getDiagnostics();
}
@Override
- public void setDiagnostics(String diagnostics) {
+ public synchronized void setDiagnostics(String diagnostics) {
maybeInitBuilder();
builder.setDiagnostics(diagnostics);
}
@Override
- public String getJobFile() {
+ public synchronized String getJobFile() {
JobReportProtoOrBuilder p = viaProto ? proto : builder;
return p.getJobFile();
}
@Override
- public void setJobFile(String jobFile) {
+ public synchronized void setJobFile(String jobFile) {
maybeInitBuilder();
builder.setJobFile(jobFile);
}
+ @Override
+ public synchronized List<AMInfo> getAMInfos() {
+ initAMInfos();
+ return this.amInfos;
+ }
+
+ @Override
+ public synchronized void setAMInfos(List<AMInfo> amInfos) {
+ maybeInitBuilder();
+ if (amInfos == null) {
+ this.builder.clearAmInfos();
+ this.amInfos = null;
+ return;
+ }
+ initAMInfos();
+ this.amInfos.clear();
+ this.amInfos.addAll(amInfos);
+ }
+
+
+ private synchronized void initAMInfos() {
+ if (this.amInfos != null) {
+ return;
+ }
+ JobReportProtoOrBuilder p = viaProto ? proto : builder;
+ List<AMInfoProto> list = p.getAmInfosList();
+
+ this.amInfos = new ArrayList<AMInfo>();
+
+ for (AMInfoProto amInfoProto : list) {
+ this.amInfos.add(convertFromProtoFormat(amInfoProto));
+ }
+ }
+
+ private synchronized void addAMInfosToProto() {
+ maybeInitBuilder();
+ builder.clearAmInfos();
+ if (this.amInfos == null)
+ return;
+ for (AMInfo amInfo : this.amInfos) {
+ builder.addAmInfos(convertToProtoFormat(amInfo));
+ }
+ }
+
+ private AMInfoPBImpl convertFromProtoFormat(AMInfoProto p) {
+ return new AMInfoPBImpl(p);
+ }
+
+ private AMInfoProto convertToProtoFormat(AMInfo t) {
+ return ((AMInfoPBImpl)t).getProto();
+ }
+
private JobIdPBImpl convertFromProtoFormat(JobIdProto p) {
return new JobIdPBImpl(p);
}
@@ -257,7 +332,4 @@ public class JobReportPBImpl extends Pro
private JobState convertFromProtoFormat(JobStateProto e) {
return MRProtoUtils.convertFromProtoFormat(e);
}
-
-
-
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/impl/pb/TaskAttemptReportPBImpl.java Wed Nov 2 05:34:31 2011
@@ -31,7 +31,10 @@ import org.apache.hadoop.mapreduce.v2.pr
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptReportProtoOrBuilder;
import org.apache.hadoop.mapreduce.v2.proto.MRProtos.TaskAttemptStateProto;
import org.apache.hadoop.mapreduce.v2.util.MRProtoUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
@@ -42,6 +45,7 @@ public class TaskAttemptReportPBImpl ext
private TaskAttemptId taskAttemptId = null;
private Counters counters = null;
+ private ContainerId containerId = null;
public TaskAttemptReportPBImpl() {
@@ -67,6 +71,9 @@ public class TaskAttemptReportPBImpl ext
if (this.counters != null) {
builder.setCounters(convertToProtoFormat(this.counters));
}
+ if (this.containerId != null) {
+ builder.setContainerId(convertToProtoFormat(this.containerId));
+ }
}
private void mergeLocalToProto() {
@@ -255,7 +262,80 @@ public class TaskAttemptReportPBImpl ext
}
builder.setPhase(convertToProtoFormat(phase));
}
+
+ @Override
+ public String getNodeManagerHost() {
+ TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodeManagerHost()) {
+ return null;
+ }
+ return p.getNodeManagerHost();
+ }
+
+ @Override
+ public void setNodeManagerHost(String nmHost) {
+ maybeInitBuilder();
+ if (nmHost == null) {
+ builder.clearNodeManagerHost();
+ return;
+ }
+ builder.setNodeManagerHost(nmHost);
+ }
+
+ @Override
+ public int getNodeManagerPort() {
+ TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.getNodeManagerPort());
+ }
+
+ @Override
+ public void setNodeManagerPort(int nmPort) {
+ maybeInitBuilder();
+ builder.setNodeManagerPort(nmPort);
+ }
+
+ @Override
+ public int getNodeManagerHttpPort() {
+ TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
+ return (p.getNodeManagerHttpPort());
+ }
+
+ @Override
+ public void setNodeManagerHttpPort(int nmHttpPort) {
+ maybeInitBuilder();
+ builder.setNodeManagerHttpPort(nmHttpPort);
+ }
+
+ @Override
+ public ContainerId getContainerId() {
+ TaskAttemptReportProtoOrBuilder p = viaProto ? proto : builder;
+ if (containerId != null) {
+ return containerId;
+ } // Else via proto
+ if (!p.hasContainerId()) {
+ return null;
+ }
+ containerId = convertFromProtoFormat(p.getContainerId());
+ return containerId;
+ }
+ @Override
+ public void setContainerId(ContainerId containerId) {
+ maybeInitBuilder();
+ if (containerId == null) {
+ builder.clearContainerId();
+ }
+ this.containerId = containerId;
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl)t).getProto();
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
private CountersPBImpl convertFromProtoFormat(CountersProto p) {
return new CountersPBImpl(p);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java Wed Nov 2 05:34:31 2011
@@ -29,11 +29,11 @@ import org.apache.hadoop.mapreduce.v2.ap
public class FileNameIndexUtils {
- static final String UNDERSCORE_ESCAPE = "%5F";
static final int JOB_NAME_TRIM_LENGTH = 50;
- //This has to be underscore currently. Untill escape uses DELIMITER.
- static final String DELIMITER = "_";
+ // Sanitize job history file for predictable parsing
+ static final String DELIMITER = "-";
+ static final String DELIMITER_ESCAPE = "%2D";
private static final int JOB_ID_INDEX = 0;
private static final int SUBMIT_TIME_INDEX = 1;
@@ -54,7 +54,7 @@ public class FileNameIndexUtils {
public static String getDoneFileName(JobIndexInfo indexInfo) throws IOException {
StringBuilder sb = new StringBuilder();
//JobId
- sb.append(escapeUnderscores(TypeConverter.fromYarn(indexInfo.getJobId()).toString()));
+ sb.append(escapeDelimiters(TypeConverter.fromYarn(indexInfo.getJobId()).toString()));
sb.append(DELIMITER);
//StartTime
@@ -62,11 +62,11 @@ public class FileNameIndexUtils {
sb.append(DELIMITER);
//UserName
- sb.append(escapeUnderscores(getUserName(indexInfo)));
+ sb.append(escapeDelimiters(getUserName(indexInfo)));
sb.append(DELIMITER);
//JobName
- sb.append(escapeUnderscores(trimJobName(getJobName(indexInfo))));
+ sb.append(escapeDelimiters(trimJobName(getJobName(indexInfo))));
sb.append(DELIMITER);
//FinishTime
@@ -136,13 +136,13 @@ public class FileNameIndexUtils {
*/
public static String encodeJobHistoryFileName(String logFileName)
throws IOException {
- String replacementUnderscoreEscape = null;
+ String replacementDelimiterEscape = null;
- if (logFileName.contains(UNDERSCORE_ESCAPE)) {
- replacementUnderscoreEscape = nonOccursString(logFileName);
+ // Temporarily protect the escape delimiters from encoding
+ if (logFileName.contains(DELIMITER_ESCAPE)) {
+ replacementDelimiterEscape = nonOccursString(logFileName);
- logFileName = replaceStringInstances
- (logFileName, UNDERSCORE_ESCAPE, replacementUnderscoreEscape);
+ logFileName = logFileName.replaceAll(DELIMITER_ESCAPE, replacementDelimiterEscape);
}
String encodedFileName = null;
@@ -154,10 +154,10 @@ public class FileNameIndexUtils {
ioe.setStackTrace(uee.getStackTrace());
throw ioe;
}
-
- if (replacementUnderscoreEscape != null) {
- encodedFileName = replaceStringInstances
- (encodedFileName, replacementUnderscoreEscape, UNDERSCORE_ESCAPE);
+
+ // Restore protected escape delimiters after encoding
+ if (replacementDelimiterEscape != null) {
+ encodedFileName = encodedFileName.replaceAll(replacementDelimiterEscape, DELIMITER_ESCAPE);
}
return encodedFileName;
@@ -214,29 +214,10 @@ public class FileNameIndexUtils {
return in;
}
- private static String escapeUnderscores(String escapee) {
- return replaceStringInstances(escapee, "_", UNDERSCORE_ESCAPE);
+ private static String escapeDelimiters(String escapee) {
+ return escapee.replaceAll(DELIMITER, DELIMITER_ESCAPE);
}
-
- // I tolerate this code because I expect a low number of
- // occurrences in a relatively short string
- private static String replaceStringInstances
- (String logFileName, String old, String replacement) {
- int index = logFileName.indexOf(old);
-
- while (index > 0) {
- logFileName = (logFileName.substring(0, index)
- + replacement
- + replaceStringInstances
- (logFileName.substring(index + old.length()),
- old, replacement));
- index = logFileName.indexOf(old);
- }
-
- return logFileName;
- }
-
/**
* Trims the job-name if required
*/
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Wed Nov 2 05:34:31 2011
@@ -33,7 +33,9 @@ public class JHAdminConfig {
/** host:port address for History Server API.*/
public static final String MR_HISTORY_ADDRESS = MR_HISTORY_PREFIX + "address";
- public static final String DEFAULT_MR_HISTORY_ADDRESS = "0.0.0.0:10020";
+ public static final int DEFAULT_MR_HISTORY_PORT = 10020;
+ public static final String DEFAULT_MR_HISTORY_ADDRESS = "0.0.0.0:" +
+ DEFAULT_MR_HISTORY_PORT;
/** If history cleaning should be enabled or not.*/
public static final String MR_HISTORY_CLEANER_ENABLE =
@@ -106,6 +108,7 @@ public class JHAdminConfig {
/**The address the history server webapp is on.*/
public static final String MR_HISTORY_WEBAPP_ADDRESS =
MR_HISTORY_PREFIX + "webapp.address";
+ public static final int DEFAULT_MR_HISTORY_WEBAPP_PORT = 19888;
public static final String DEFAULT_MR_HISTORY_WEBAPP_ADDRESS =
- "0.0.0.0:19888";
+ "0.0.0.0:" + DEFAULT_MR_HISTORY_WEBAPP_PORT;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Wed Nov 2 05:34:31 2011
@@ -480,7 +480,9 @@ public class JobHistoryUtils {
//construct the history url for job
String hsAddress = conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
- InetSocketAddress address = NetUtils.createSocketAddr(hsAddress);
+ InetSocketAddress address = NetUtils.createSocketAddr(
+ hsAddress, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT,
+ JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
StringBuffer sb = new StringBuffer();
if (address.getAddress().isAnyLocalAddress() ||
address.getAddress().isLoopbackAddress()) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Wed Nov 2 05:34:31 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.yarn.ContainerLogAppender;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.ApplicationConstants;
@@ -182,17 +183,18 @@ public class MRApps extends Apps {
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
String cp = reader.readLine();
if (cp != null) {
- addToEnvironment(environment, Environment.CLASSPATH.name(), cp.trim());
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), cp.trim());
}
// Put the file itself on classpath for tasks.
- addToEnvironment(
+ Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
- thisClassLoader.getResource(mrAppGeneratedClasspathFile).getFile());
+ thisClassLoader.getResource(mrAppGeneratedClasspathFile).getFile()
+ .split("!")[0]);
// Add standard Hadoop classes
for (String c : ApplicationConstants.APPLICATION_CLASSPATH) {
- addToEnvironment(environment, Environment.CLASSPATH.name(), c);
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c);
}
} finally {
if (classpathFileStream != null) {
@@ -205,28 +207,13 @@ public class MRApps extends Apps {
// TODO: Remove duplicates.
}
- private static final String SYSTEM_PATH_SEPARATOR =
- System.getProperty("path.separator");
-
- public static void addToEnvironment(
- Map<String, String> environment,
- String variable, String value) {
- String val = environment.get(variable);
- if (val == null) {
- val = value;
- } else {
- val = val + SYSTEM_PATH_SEPARATOR + value;
- }
- environment.put(variable, val);
- }
-
public static void setClasspath(Map<String, String> environment)
throws IOException {
- MRApps.addToEnvironment(
+ Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
MRJobConfig.JOB_JAR);
- MRApps.addToEnvironment(
+ Apps.addToEnvironment(
environment,
Environment.CLASSPATH.name(),
Environment.PWD.$() + Path.SEPARATOR + "*");
@@ -355,43 +342,19 @@ public class MRApps extends Apps {
}
return result;
}
-
- public static void setEnvFromInputString(Map<String, String> env,
- String envString) {
- if (envString != null && envString.length() > 0) {
- String childEnvs[] = envString.split(",");
- for (String cEnv : childEnvs) {
- String[] parts = cEnv.split("="); // split on '='
- String value = env.get(parts[0]);
- if (value != null) {
- // Replace $env with the child's env constructed by NM's
- // For example: LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // example PATH=$PATH:/tmp
- value = System.getenv(parts[0]);
- if (value != null) {
- // the env key is present in the tt's env
- value = parts[1].replace("$" + parts[0], value);
- } else {
- // check for simple variable substitution
- // for e.g. ROOT=$HOME
- String envValue = System.getenv(parts[1].substring(1));
- if (envValue != null) {
- value = envValue;
- } else {
- // the env key is note present anywhere .. simply set it
- // example X=$X:/tmp or X=/tmp
- value = parts[1].replace("$" + parts[0], "");
- }
- }
- }
- addToEnvironment(env, parts[0], value);
- }
- }
+ /**
+ * Add the JVM system properties necessary to configure {@link ContainerLogAppender}.
+ * @param logLevel the desired log level (eg INFO/WARN/DEBUG)
+ * @param logSize See {@link ContainerLogAppender#setTotalLogFileSize(long)}
+ * @param vargs the argument list to append to
+ */
+ public static void addLog4jSystemProperties(
+ String logLevel, long logSize, List<String> vargs) {
+ vargs.add("-Dlog4j.configuration=container-log4j.properties");
+ vargs.add("-D" + MRJobConfig.TASK_LOG_DIR + "=" +
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ vargs.add("-D" + MRJobConfig.TASK_LOG_SIZE + "=" + logSize);
+ vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
}
-
-
-
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRBuilderUtils.java Wed Nov 2 05:34:31 2011
@@ -18,13 +18,18 @@
package org.apache.hadoop.mapreduce.v2.util;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.Records;
public class MRBuilderUtils {
@@ -53,14 +58,15 @@ public class MRBuilderUtils {
}
public static JobReport newJobReport(JobId jobId, String jobName,
- String userName, JobState state, long startTime, long finishTime,
+ String userName, JobState state, long submitTime, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress,
- float cleanupProgress, String jobFile) {
+ float cleanupProgress, String jobFile, List<AMInfo> amInfos) {
JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId);
report.setJobName(jobName);
report.setUser(userName);
report.setJobState(state);
+ report.setSubmitTime(submitTime);
report.setStartTime(startTime);
report.setFinishTime(finishTime);
report.setSetupProgress(setupProgress);
@@ -68,6 +74,20 @@ public class MRBuilderUtils {
report.setMapProgress(mapProgress);
report.setReduceProgress(reduceProgress);
report.setJobFile(jobFile);
+ report.setAMInfos(amInfos);
return report;
}
+
+ public static AMInfo newAMInfo(ApplicationAttemptId appAttemptId,
+ long startTime, ContainerId containerId, String nmHost, int nmPort,
+ int nmHttpPort) {
+ AMInfo amInfo = Records.newRecord(AMInfo.class);
+ amInfo.setAppAttemptId(appAttemptId);
+ amInfo.setStartTime(startTime);
+ amInfo.setContainerId(containerId);
+ amInfo.setNodeManagerHost(nmHost);
+ amInfo.setNodeManagerPort(nmPort);
+ amInfo.setNodeManagerHttpPort(nmHttpPort);
+ return amInfo;
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Wed Nov 2 05:34:31 2011
@@ -119,6 +119,10 @@ message TaskAttemptReportProto {
optional PhaseProto phase = 9;
optional int64 shuffle_finish_time = 10;
optional int64 sort_finish_time=11;
+ optional string node_manager_host = 12;
+ optional int32 node_manager_port = 13;
+ optional int32 node_manager_http_port = 14;
+ optional ContainerIdProto container_id = 15;
}
enum JobStateProto {
@@ -146,6 +150,17 @@ message JobReportProto {
optional string trackingUrl = 11;
optional string diagnostics = 12;
optional string jobFile = 13;
+ repeated AMInfoProto am_infos = 14;
+ optional int64 submit_time = 15;
+}
+
+message AMInfoProto {
+ optional ApplicationAttemptIdProto application_attempt_id = 1;
+ optional int64 start_time = 2;
+ optional ContainerIdProto container_id = 3;
+ optional string node_manager_host = 4;
+ optional int32 node_manager_port = 5;
+ optional int32 node_manager_http_port = 6;
}
enum TaskAttemptCompletionEventStatusProto {
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java Wed Nov 2 05:34:31 2011
@@ -20,12 +20,18 @@ package org.apache.hadoop.mapreduce;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.QueueInfoPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.QueueState;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -33,6 +39,31 @@ import org.junit.Test;
public class TestTypeConverter {
@Test
+ public void testEnums() throws Exception {
+ for (YarnApplicationState applicationState : YarnApplicationState.values()) {
+ TypeConverter.fromYarn(applicationState);
+ }
+
+ for (TaskType taskType : TaskType.values()) {
+ TypeConverter.fromYarn(taskType);
+ }
+
+ for (JobState jobState : JobState.values()) {
+ TypeConverter.fromYarn(jobState);
+ }
+
+ for (QueueState queueState : QueueState.values()) {
+ TypeConverter.fromYarn(queueState);
+ }
+
+ for (TaskState taskState : TaskState.values()) {
+ TypeConverter.fromYarn(taskState);
+ }
+
+
+ }
+
+ @Test
public void testFromYarn() throws Exception {
int appStartTime = 612354;
YarnApplicationState state = YarnApplicationState.RUNNING;
@@ -42,6 +73,15 @@ public class TestTypeConverter {
applicationReport.setYarnApplicationState(state);
applicationReport.setStartTime(appStartTime);
applicationReport.setUser("TestTypeConverter-user");
+ ApplicationResourceUsageReportPBImpl appUsageRpt = new ApplicationResourceUsageReportPBImpl();
+ ResourcePBImpl r = new ResourcePBImpl();
+ r.setMemory(2048);
+ appUsageRpt.setNeededResources(r);
+ appUsageRpt.setNumReservedContainers(1);
+ appUsageRpt.setNumUsedContainers(3);
+ appUsageRpt.setReservedResources(r);
+ appUsageRpt.setUsedResources(r);
+ applicationReport.setApplicationResourceUsageReport(appUsageRpt);
JobStatus jobStatus = TypeConverter.fromYarn(applicationReport, "dummy-jobfile");
Assert.assertEquals(appStartTime, jobStatus.getStartTime());
Assert.assertEquals(state.toString(), jobStatus.getState().toString());
@@ -60,6 +100,15 @@ public class TestTypeConverter {
when(mockReport.getUser()).thenReturn("dummy-user");
when(mockReport.getQueue()).thenReturn("dummy-queue");
String jobFile = "dummy-path/job.xml";
+ ApplicationResourceUsageReportPBImpl appUsageRpt = new ApplicationResourceUsageReportPBImpl();
+ ResourcePBImpl r = new ResourcePBImpl();
+ r.setMemory(2048);
+ appUsageRpt.setNeededResources(r);
+ appUsageRpt.setNumReservedContainers(1);
+ appUsageRpt.setNumUsedContainers(3);
+ appUsageRpt.setReservedResources(r);
+ appUsageRpt.setUsedResources(r);
+ when(mockReport.getApplicationResourceUsageReport()).thenReturn(appUsageRpt);
JobStatus status = TypeConverter.fromYarn(mockReport, jobFile);
Assert.assertNotNull("fromYarn returned null status", status);
Assert.assertEquals("jobFile set incorrectly", "dummy-path/job.xml", status.getJobFile());
@@ -69,6 +118,11 @@ public class TestTypeConverter {
Assert.assertEquals("schedulingInfo set incorrectly", "dummy-tracking-url", status.getSchedulingInfo());
Assert.assertEquals("jobId set incorrectly", 6789, status.getJobID().getId());
Assert.assertEquals("state set incorrectly", JobStatus.State.KILLED, status.getState());
+ Assert.assertEquals("needed mem info set incorrectly", 2048, status.getNeededMem());
+ Assert.assertEquals("num rsvd slots info set incorrectly", 1, status.getNumReservedSlots());
+ Assert.assertEquals("num used slots info set incorrectly", 3, status.getNumUsedSlots());
+ Assert.assertEquals("rsvd mem info set incorrectly", 2048, status.getReservedMem());
+ Assert.assertEquals("used mem info set incorrectly", 2048, status.getUsedMem());
}
@Test