You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tg...@apache.org on 2013/04/05 17:43:53 UTC
svn commit: r1465017 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/
hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/...
Author: tgraves
Date: Fri Apr 5 15:43:53 2013
New Revision: 1465017
URL: http://svn.apache.org/r1465017
Log:
MAPREDUCE-5007. fix coverage org.apache.hadoop.mapreduce.v2.hs (Aleksey Gorshkov via tgraves)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1465017&r1=1465016&r2=1465017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Apr 5 15:43:53 2013
@@ -773,6 +773,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-4991. coverage for gridmix (Aleksey Gorshkov via tgraves)
+ MAPREDUCE-5007. fix coverage org.apache.hadoop.mapreduce.v2.hs (Aleksey
+ Gorshkov via tgraves)
+
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1465017&r1=1465016&r2=1465017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Fri Apr 5 15:43:53 2013
@@ -869,4 +869,9 @@ public class HistoryFileManager extends
}
}
}
+ // for test
+ @VisibleForTesting
+ void setMaxHistoryAge(long newValue){
+ maxHistoryAge=newValue;
+ }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java?rev=1465017&r1=1465016&r2=1465017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestCompletedTask.java Fri Apr 5 15:43:53 2013
@@ -21,46 +21,75 @@ package org.apache.hadoop.mapreduce.v2.h
import java.util.Map;
import java.util.TreeMap;
+import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.hs.CompletedTask;
-import org.junit.Assert;
import org.junit.Test;
-import org.mockito.Mockito;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
public class TestCompletedTask{
- @Test
+ @Test (timeout=5000)
public void testTaskStartTimes() {
- TaskId taskId = Mockito.mock(TaskId.class);
- TaskInfo taskInfo = Mockito.mock(TaskInfo.class);
+ TaskId taskId = mock(TaskId.class);
+ TaskInfo taskInfo = mock(TaskInfo.class);
Map<TaskAttemptID, TaskAttemptInfo> taskAttempts
= new TreeMap<TaskAttemptID, TaskAttemptInfo>();
TaskAttemptID id = new TaskAttemptID("0", 0, TaskType.MAP, 0, 0);
- TaskAttemptInfo info = Mockito.mock(TaskAttemptInfo.class);
- Mockito.when(info.getAttemptId()).thenReturn(id);
- Mockito.when(info.getStartTime()).thenReturn(10l);
+ TaskAttemptInfo info = mock(TaskAttemptInfo.class);
+ when(info.getAttemptId()).thenReturn(id);
+ when(info.getStartTime()).thenReturn(10l);
taskAttempts.put(id, info);
id = new TaskAttemptID("1", 0, TaskType.MAP, 1, 1);
- info = Mockito.mock(TaskAttemptInfo.class);
- Mockito.when(info.getAttemptId()).thenReturn(id);
- Mockito.when(info.getStartTime()).thenReturn(20l);
+ info = mock(TaskAttemptInfo.class);
+ when(info.getAttemptId()).thenReturn(id);
+ when(info.getStartTime()).thenReturn(20l);
taskAttempts.put(id, info);
- Mockito.when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts);
+ when(taskInfo.getAllTaskAttempts()).thenReturn(taskAttempts);
CompletedTask task = new CompletedTask(taskId, taskInfo);
TaskReport report = task.getReport();
// Make sure the startTime returned by report is the lesser of the
// attempy launch times
- Assert.assertTrue(report.getStartTime() == 10);
+ assertTrue(report.getStartTime() == 10);
+ }
+ /**
+ * test some methods of CompletedTaskAttempt
+ */
+ @Test (timeout=5000)
+ public void testCompletedTaskAttempt(){
+
+ TaskAttemptInfo attemptInfo= mock(TaskAttemptInfo.class);
+ when(attemptInfo.getRackname()).thenReturn("Rackname");
+ when(attemptInfo.getShuffleFinishTime()).thenReturn(11L);
+ when(attemptInfo.getSortFinishTime()).thenReturn(12L);
+ when(attemptInfo.getShufflePort()).thenReturn(10);
+
+ JobID jobId= new JobID("12345",0);
+ TaskID taskId =new TaskID(jobId,TaskType.REDUCE, 0);
+ TaskAttemptID taskAttemptId= new TaskAttemptID(taskId, 0);
+ when(attemptInfo.getAttemptId()).thenReturn(taskAttemptId);
+
+
+ CompletedTaskAttempt taskAttemt= new CompletedTaskAttempt(null,attemptInfo);
+ assertEquals( "Rackname", taskAttemt.getNodeRackName());
+ assertEquals( Phase.CLEANUP, taskAttemt.getPhase());
+ assertTrue( taskAttemt.isFinished());
+ assertEquals( 11L, taskAttemt.getShuffleFinishTime());
+ assertEquals( 12L, taskAttemt.getSortFinishTime());
+ assertEquals( 10, taskAttemt.getShufflePort());
}
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1465017&r1=1465016&r2=1465017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Fri Apr 5 15:43:53 2013
@@ -45,7 +45,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@RunWith(value = Parameterized.class)
@@ -79,7 +81,7 @@ public class TestJobHistoryEntities {
}
/* Verify some expected values based on the history file */
- @Test (timeout=10000)
+ @Test (timeout=100000)
public void testCompletedJob() throws Exception {
HistoryFileInfo info = mock(HistoryFileInfo.class);
when(info.getConfFile()).thenReturn(fullConfPath);
@@ -168,4 +170,45 @@ public class TestJobHistoryEntities {
assertEquals(45454, rta1Report.getNodeManagerPort());
assertEquals(9999, rta1Report.getNodeManagerHttpPort());
}
+ /**
+ * Simple test of some methods of CompletedJob
+ * @throws Exception
+ */
+ @Test (timeout=30000)
+ public void testGetTaskAttemptCompletionEvent() throws Exception{
+ HistoryFileInfo info = mock(HistoryFileInfo.class);
+ when(info.getConfFile()).thenReturn(fullConfPath);
+ completedJob =
+ new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user",
+ info, jobAclsManager);
+ TaskCompletionEvent[] events= completedJob.getMapAttemptCompletionEvents(0,1000);
+ assertEquals(10, completedJob.getMapAttemptCompletionEvents(0,10).length);
+ int currentEventId=0;
+ for (TaskCompletionEvent taskAttemptCompletionEvent : events) {
+ int eventId= taskAttemptCompletionEvent.getEventId();
+ assertTrue(eventId>=currentEventId);
+ currentEventId=eventId;
+ }
+ assertNull(completedJob.loadConfFile() );
+ // job name
+ assertEquals("Sleep job",completedJob.getName());
+ // queue name
+ assertEquals("default",completedJob.getQueueName());
+ // progress
+ assertEquals(1.0, completedJob.getProgress(),0.001);
+ // 11 rows in answer
+ assertEquals(11,completedJob.getTaskAttemptCompletionEvents(0,1000).length);
+ // select first 10 rows
+ assertEquals(10,completedJob.getTaskAttemptCompletionEvents(0,10).length);
+ // select 5-10 rows include 5th
+ assertEquals(6,completedJob.getTaskAttemptCompletionEvents(5,10).length);
+
+ // without errors
+ assertEquals(1,completedJob.getDiagnostics().size());
+ assertEquals("",completedJob.getDiagnostics().get(0));
+
+ assertEquals(0, completedJob.getJobACLs().size());
+
+ }
+
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1465017&r1=1465016&r2=1465017&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Fri Apr 5 15:43:53 2013
@@ -19,6 +19,9 @@
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.ByteArrayOutputStream;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.io.PrintStream;
import java.util.Arrays;
@@ -54,6 +57,9 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
+import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.TaskIdPBImpl;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -65,7 +71,9 @@ import org.apache.hadoop.mapreduce.v2.hs
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo;
import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -80,12 +88,12 @@ public class TestJobHistoryParsing {
private static final String RACK_NAME = "/MyRackName";
- private ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+ private ByteArrayOutputStream outContent = new ByteArrayOutputStream();
public static class MyResolver implements DNSToSwitchMapping {
@Override
public List<String> resolve(List<String> names) {
- return Arrays.asList(new String[]{RACK_NAME});
+ return Arrays.asList(new String[] { RACK_NAME });
}
@Override
@@ -93,14 +101,14 @@ public class TestJobHistoryParsing {
}
}
- @Test (timeout=50000)
+ @Test(timeout = 50000)
public void testJobInfo() throws Exception {
JobInfo info = new JobInfo();
Assert.assertEquals("NORMAL", info.getPriority());
info.printAll();
}
- @Test (timeout=50000)
+ @Test(timeout = 300000)
public void testHistoryParsing() throws Exception {
LOG.info("STARTING testHistoryParsing()");
try {
@@ -109,8 +117,8 @@ public class TestJobHistoryParsing {
LOG.info("FINISHED testHistoryParsing()");
}
}
-
- @Test (timeout=50000)
+
+ @Test(timeout = 50000)
public void testHistoryParsingWithParseErrors() throws Exception {
LOG.info("STARTING testHistoryParsingWithParseErrors()");
try {
@@ -119,18 +127,18 @@ public class TestJobHistoryParsing {
LOG.info("FINISHED testHistoryParsingWithParseErrors()");
}
}
-
- private static String getJobSummary(FileContext fc, Path path) throws IOException {
+
+ private static String getJobSummary(FileContext fc, Path path)
+ throws IOException {
Path qPath = fc.makeQualified(path);
FSDataInputStream in = fc.open(qPath);
String jobSummaryString = in.readUTF();
in.close();
return jobSummaryString;
}
-
+
private void checkHistoryParsing(final int numMaps, final int numReduces,
- final int numSuccessfulMaps)
- throws Exception {
+ final int numSuccessfulMaps) throws Exception {
Configuration conf = new Configuration();
conf.set(MRJobConfig.USER_NAME, System.getProperty("user.name"));
long amStartTimeEst = System.currentTimeMillis();
@@ -138,9 +146,8 @@ public class TestJobHistoryParsing {
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
MyResolver.class, DNSToSwitchMapping.class);
RackResolver.init(conf);
- MRApp app =
- new MRAppWithHistory(numMaps, numReduces, true,
- this.getClass().getName(), true);
+ MRApp app = new MRAppWithHistory(numMaps, numReduces, true, this.getClass()
+ .getName(), true);
app.submit(conf);
Job job = app.getContext().getAllJobs().values().iterator().next();
JobId jobId = job.getID();
@@ -152,7 +159,7 @@ public class TestJobHistoryParsing {
String jobhistoryDir = JobHistoryUtils
.getHistoryIntermediateDoneDirForUser(conf);
-
+
FileContext fc = null;
try {
fc = FileContext.getFileContext(conf);
@@ -160,7 +167,7 @@ public class TestJobHistoryParsing {
LOG.info("Can not get FileContext", ioe);
throw (new Exception("Can not get File Context"));
}
-
+
if (numMaps == numSuccessfulMaps) {
String summaryFileName = JobHistoryUtils
.getIntermediateSummaryFileName(jobId);
@@ -185,20 +192,22 @@ public class TestJobHistoryParsing {
Long.parseLong(jobSummaryElements.get("submitTime")) != 0);
Assert.assertTrue("launchTime should not be 0",
Long.parseLong(jobSummaryElements.get("launchTime")) != 0);
- Assert.assertTrue("firstMapTaskLaunchTime should not be 0",
- Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
Assert
- .assertTrue(
- "firstReduceTaskLaunchTime should not be 0",
- Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0);
+ .assertTrue(
+ "firstMapTaskLaunchTime should not be 0",
+ Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0);
+ Assert
+ .assertTrue("firstReduceTaskLaunchTime should not be 0",
+ Long.parseLong(jobSummaryElements
+ .get("firstReduceTaskLaunchTime")) != 0);
Assert.assertTrue("finishTime should not be 0",
Long.parseLong(jobSummaryElements.get("finishTime")) != 0);
Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps,
Integer.parseInt(jobSummaryElements.get("numMaps")));
Assert.assertEquals("Mismatch in num reduce slots", numReduces,
Integer.parseInt(jobSummaryElements.get("numReduces")));
- Assert.assertEquals("User does not match", System.getProperty("user.name"),
- jobSummaryElements.get("user"));
+ Assert.assertEquals("User does not match",
+ System.getProperty("user.name"), jobSummaryElements.get("user"));
Assert.assertEquals("Queue does not match", "default",
jobSummaryElements.get("queue"));
Assert.assertEquals("Status does not match", "SUCCEEDED",
@@ -210,8 +219,8 @@ public class TestJobHistoryParsing {
HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId);
JobInfo jobInfo;
long numFinishedMaps;
-
- synchronized(fileInfo) {
+
+ synchronized (fileInfo) {
Path historyFilePath = fileInfo.getHistoryFile();
FSDataInputStream in = null;
LOG.info("JobHistoryFile is: " + historyFilePath);
@@ -228,11 +237,11 @@ public class TestJobHistoryParsing {
if (numMaps == numSuccessfulMaps) {
reader = realReader;
} else {
- final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
+ final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack!
Mockito.when(reader.getNextEvent()).thenAnswer(
new Answer<HistoryEvent>() {
- public HistoryEvent answer(InvocationOnMock invocation)
- throws IOException {
+ public HistoryEvent answer(InvocationOnMock invocation)
+ throws IOException {
HistoryEvent event = realReader.getNextEvent();
if (event instanceof TaskFinishedEvent) {
numFinishedEvents.incrementAndGet();
@@ -244,22 +253,20 @@ public class TestJobHistoryParsing {
throw new IOException("test");
}
}
- }
- );
+ });
}
jobInfo = parser.parse(reader);
- numFinishedMaps =
- computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
+ numFinishedMaps = computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps);
if (numFinishedMaps != numMaps) {
Exception parseException = parser.getParseException();
- Assert.assertNotNull("Didn't get expected parse exception",
+ Assert.assertNotNull("Didn't get expected parse exception",
parseException);
}
}
-
+
Assert.assertEquals("Incorrect username ", System.getProperty("user.name"),
jobInfo.getUsername());
Assert.assertEquals("Incorrect jobName ", "test", jobInfo.getJobname());
@@ -267,7 +274,7 @@ public class TestJobHistoryParsing {
jobInfo.getJobQueueName());
Assert
.assertEquals("incorrect conf path", "test", jobInfo.getJobConfPath());
- Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps,
+ Assert.assertEquals("incorrect finishedMap ", numSuccessfulMaps,
numFinishedMaps);
Assert.assertEquals("incorrect finishedReduces ", numReduces,
jobInfo.getFinishedReduces());
@@ -275,8 +282,8 @@ public class TestJobHistoryParsing {
jobInfo.getUberized());
Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
int totalTasks = allTasks.size();
- Assert.assertEquals("total number of tasks is incorrect ",
- (numMaps+numReduces), totalTasks);
+ Assert.assertEquals("total number of tasks is incorrect ",
+ (numMaps + numReduces), totalTasks);
// Verify aminfo
Assert.assertEquals(1, jobInfo.getAMInfos().size());
@@ -306,8 +313,7 @@ public class TestJobHistoryParsing {
// Deep compare Job and JobInfo
for (Task task : job.getTasks().values()) {
- TaskInfo taskInfo = allTasks.get(
- TypeConverter.fromYarn(task.getID()));
+ TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
Assert.assertNotNull("TaskInfo not found", taskInfo);
for (TaskAttempt taskAttempt : task.getAttempts().values()) {
TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
@@ -318,27 +324,32 @@ public class TestJobHistoryParsing {
if (numMaps == numSuccessfulMaps) {
Assert.assertEquals(MRApp.NM_HOST, taskAttemptInfo.getHostname());
Assert.assertEquals(MRApp.NM_PORT, taskAttemptInfo.getPort());
-
+
// Verify rack-name
- Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
- .getRackname(), RACK_NAME);
+ Assert.assertEquals("rack-name is incorrect",
+ taskAttemptInfo.getRackname(), RACK_NAME);
}
}
}
-
+
// test output for HistoryViewer
- PrintStream stdps=System.out;
+ PrintStream stdps = System.out;
try {
System.setOut(new PrintStream(outContent));
HistoryViewer viewer = new HistoryViewer(fc.makeQualified(
fileInfo.getHistoryFile()).toString(), conf, true);
viewer.print();
-
- for (TaskInfo taskInfo : allTasks.values()) {
-
- String test= (taskInfo.getTaskStatus()==null?"":taskInfo.getTaskStatus())+" "+taskInfo.getTaskType()+" task list for "+taskInfo.getTaskId().getJobID();
- Assert.assertTrue(outContent.toString().indexOf(test)>0);
- Assert.assertTrue(outContent.toString().indexOf(taskInfo.getTaskId().toString())>0);
+
+ for (TaskInfo taskInfo : allTasks.values()) {
+
+ String test = (taskInfo.getTaskStatus() == null ? "" : taskInfo
+ .getTaskStatus())
+ + " "
+ + taskInfo.getTaskType()
+ + " task list for " + taskInfo.getTaskId().getJobID();
+ Assert.assertTrue(outContent.toString().indexOf(test) > 0);
+ Assert.assertTrue(outContent.toString().indexOf(
+ taskInfo.getTaskId().toString()) > 0);
}
} finally {
System.setOut(stdps);
@@ -363,186 +374,180 @@ public class TestJobHistoryParsing {
}
return numFinishedMaps;
}
-
- @Test (timeout=50000)
+
+ @Test(timeout = 30000)
public void testHistoryParsingForFailedAttempts() throws Exception {
LOG.info("STARTING testHistoryParsingForFailedAttempts");
try {
- Configuration conf = new Configuration();
- conf
- .setClass(
- CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
- MyResolver.class, DNSToSwitchMapping.class);
- RackResolver.init(conf);
- MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this.getClass().getName(),
- true);
- app.submit(conf);
- Job job = app.getContext().getAllJobs().values().iterator().next();
- JobId jobId = job.getID();
- app.waitForState(job, JobState.SUCCEEDED);
-
- // make sure all events are flushed
- app.waitForState(Service.STATE.STOPPED);
-
- String jobhistoryDir = JobHistoryUtils
- .getHistoryIntermediateDoneDirForUser(conf);
- JobHistory jobHistory = new JobHistory();
- jobHistory.init(conf);
+ Configuration conf = new Configuration();
+ conf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ RackResolver.init(conf);
+ MRApp app = new MRAppWithHistoryWithFailedAttempt(2, 1, true, this
+ .getClass().getName(), true);
+ app.submit(conf);
+ Job job = app.getContext().getAllJobs().values().iterator().next();
+ JobId jobId = job.getID();
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ // make sure all events are flushed
+ app.waitForState(Service.STATE.STOPPED);
+
+ String jobhistoryDir = JobHistoryUtils
+ .getHistoryIntermediateDoneDirForUser(conf);
+ JobHistory jobHistory = new JobHistory();
+ jobHistory.init(conf);
+
+ JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
+ .getJobIndexInfo();
+ String jobhistoryFileName = FileNameIndexUtils
+ .getDoneFileName(jobIndexInfo);
- JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
- .getJobIndexInfo();
- String jobhistoryFileName = FileNameIndexUtils
- .getDoneFileName(jobIndexInfo);
-
- Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
- FSDataInputStream in = null;
- FileContext fc = null;
- try {
- fc = FileContext.getFileContext(conf);
- in = fc.open(fc.makeQualified(historyFilePath));
- } catch (IOException ioe) {
- LOG.info("Can not open history file: " + historyFilePath, ioe);
- throw (new Exception("Can not open History File"));
- }
+ Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
+ FSDataInputStream in = null;
+ FileContext fc = null;
+ try {
+ fc = FileContext.getFileContext(conf);
+ in = fc.open(fc.makeQualified(historyFilePath));
+ } catch (IOException ioe) {
+ LOG.info("Can not open history file: " + historyFilePath, ioe);
+ throw (new Exception("Can not open History File"));
+ }
- JobHistoryParser parser = new JobHistoryParser(in);
- JobInfo jobInfo = parser.parse();
- Exception parseException = parser.getParseException();
- Assert.assertNull("Caught an expected exception " + parseException,
- parseException);
- int noOffailedAttempts = 0;
- Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
- for (Task task : job.getTasks().values()) {
- TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
- for (TaskAttempt taskAttempt : task.getAttempts().values()) {
- TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
- TypeConverter.fromYarn((taskAttempt.getID())));
- // Verify rack-name for all task attempts
- Assert.assertEquals("rack-name is incorrect", taskAttemptInfo
- .getRackname(), RACK_NAME);
- if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
- noOffailedAttempts++;
+ JobHistoryParser parser = new JobHistoryParser(in);
+ JobInfo jobInfo = parser.parse();
+ Exception parseException = parser.getParseException();
+ Assert.assertNull("Caught an expected exception " + parseException,
+ parseException);
+ int noOffailedAttempts = 0;
+ Map<TaskID, TaskInfo> allTasks = jobInfo.getAllTasks();
+ for (Task task : job.getTasks().values()) {
+ TaskInfo taskInfo = allTasks.get(TypeConverter.fromYarn(task.getID()));
+ for (TaskAttempt taskAttempt : task.getAttempts().values()) {
+ TaskAttemptInfo taskAttemptInfo = taskInfo.getAllTaskAttempts().get(
+ TypeConverter.fromYarn((taskAttempt.getID())));
+ // Verify rack-name for all task attempts
+ Assert.assertEquals("rack-name is incorrect",
+ taskAttemptInfo.getRackname(), RACK_NAME);
+ if (taskAttemptInfo.getTaskStatus().equals("FAILED")) {
+ noOffailedAttempts++;
+ }
}
}
- }
- Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts);
+ Assert.assertEquals("No of Failed tasks doesn't match.", 2,
+ noOffailedAttempts);
} finally {
LOG.info("FINISHED testHistoryParsingForFailedAttempts");
}
}
-
- @Test (timeout=5000)
+
+ @Test(timeout = 60000)
public void testCountersForFailedTask() throws Exception {
LOG.info("STARTING testCountersForFailedTask");
try {
- Configuration conf = new Configuration();
- conf
- .setClass(
- CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
- MyResolver.class, DNSToSwitchMapping.class);
- RackResolver.init(conf);
- MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true,
- this.getClass().getName(), true);
- app.submit(conf);
- Job job = app.getContext().getAllJobs().values().iterator().next();
- JobId jobId = job.getID();
- app.waitForState(job, JobState.FAILED);
-
- // make sure all events are flushed
- app.waitForState(Service.STATE.STOPPED);
+ Configuration conf = new Configuration();
+ conf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ RackResolver.init(conf);
+ MRApp app = new MRAppWithHistoryWithFailedTask(2, 1, true, this
+ .getClass().getName(), true);
+ app.submit(conf);
+ Job job = app.getContext().getAllJobs().values().iterator().next();
+ JobId jobId = job.getID();
+ app.waitForState(job, JobState.FAILED);
+
+ // make sure all events are flushed
+ app.waitForState(Service.STATE.STOPPED);
+
+ String jobhistoryDir = JobHistoryUtils
+ .getHistoryIntermediateDoneDirForUser(conf);
+ JobHistory jobHistory = new JobHistory();
+ jobHistory.init(conf);
+
+ JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
+ .getJobIndexInfo();
+ String jobhistoryFileName = FileNameIndexUtils
+ .getDoneFileName(jobIndexInfo);
- String jobhistoryDir = JobHistoryUtils
- .getHistoryIntermediateDoneDirForUser(conf);
- JobHistory jobHistory = new JobHistory();
- jobHistory.init(conf);
-
- JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId)
- .getJobIndexInfo();
- String jobhistoryFileName = FileNameIndexUtils
- .getDoneFileName(jobIndexInfo);
-
- Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
- FSDataInputStream in = null;
- FileContext fc = null;
- try {
- fc = FileContext.getFileContext(conf);
- in = fc.open(fc.makeQualified(historyFilePath));
- } catch (IOException ioe) {
- LOG.info("Can not open history file: " + historyFilePath, ioe);
- throw (new Exception("Can not open History File"));
- }
+ Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName);
+ FSDataInputStream in = null;
+ FileContext fc = null;
+ try {
+ fc = FileContext.getFileContext(conf);
+ in = fc.open(fc.makeQualified(historyFilePath));
+ } catch (IOException ioe) {
+ LOG.info("Can not open history file: " + historyFilePath, ioe);
+ throw (new Exception("Can not open History File"));
+ }
- JobHistoryParser parser = new JobHistoryParser(in);
- JobInfo jobInfo = parser.parse();
- Exception parseException = parser.getParseException();
- Assert.assertNull("Caught an expected exception " + parseException,
- parseException);
- for (Map.Entry<TaskID,TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
- TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
- CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
- Assert.assertNotNull("completed task report has null counters",
- ct.getReport().getCounters());
- //Make sure all the completedTask has counters, and the counters are not empty
- Assert.assertTrue(ct.getReport().getCounters()
- .getAllCounterGroups().size() > 0);
- }
+ JobHistoryParser parser = new JobHistoryParser(in);
+ JobInfo jobInfo = parser.parse();
+ Exception parseException = parser.getParseException();
+ Assert.assertNull("Caught an expected exception " + parseException,
+ parseException);
+ for (Map.Entry<TaskID, TaskInfo> entry : jobInfo.getAllTasks().entrySet()) {
+ TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
+ CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
+ Assert.assertNotNull("completed task report has null counters", ct
+ .getReport().getCounters());
+ }
} finally {
LOG.info("FINISHED testCountersForFailedTask");
}
}
- @Test (timeout=50000)
+ @Test(timeout = 50000)
public void testScanningOldDirs() throws Exception {
LOG.info("STARTING testScanningOldDirs");
try {
- Configuration conf = new Configuration();
- conf
- .setClass(
- CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
- MyResolver.class, DNSToSwitchMapping.class);
- RackResolver.init(conf);
- MRApp app =
- new MRAppWithHistory(1, 1, true,
- this.getClass().getName(), true);
- app.submit(conf);
- Job job = app.getContext().getAllJobs().values().iterator().next();
- JobId jobId = job.getID();
- LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
- app.waitForState(job, JobState.SUCCEEDED);
-
- // make sure all events are flushed
- app.waitForState(Service.STATE.STOPPED);
+ Configuration conf = new Configuration();
+ conf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+ RackResolver.init(conf);
+ MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
+ true);
+ app.submit(conf);
+ Job job = app.getContext().getAllJobs().values().iterator().next();
+ JobId jobId = job.getID();
+ LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ // make sure all events are flushed
+ app.waitForState(Service.STATE.STOPPED);
+
+ HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
+ hfm.init(conf);
+ HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
+ Assert.assertNotNull("Unable to locate job history", fileInfo);
+
+ // force the manager to "forget" the job
+ hfm.deleteJobFromJobListCache(fileInfo);
+ final int msecPerSleep = 10;
+ int msecToSleep = 10 * 1000;
+ while (fileInfo.isMovePending() && msecToSleep > 0) {
+ Assert.assertTrue(!fileInfo.didMoveFail());
+ msecToSleep -= msecPerSleep;
+ Thread.sleep(msecPerSleep);
+ }
+ Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
- HistoryFileManagerForTest hfm = new HistoryFileManagerForTest();
- hfm.init(conf);
- HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
- Assert.assertNotNull("Unable to locate job history", fileInfo);
-
- // force the manager to "forget" the job
- hfm.deleteJobFromJobListCache(fileInfo);
- final int msecPerSleep = 10;
- int msecToSleep = 10 * 1000;
- while (fileInfo.isMovePending() && msecToSleep > 0) {
- Assert.assertTrue(!fileInfo.didMoveFail());
- msecToSleep -= msecPerSleep;
- Thread.sleep(msecPerSleep);
- }
- Assert.assertTrue("Timeout waiting for history move", msecToSleep > 0);
-
- fileInfo = hfm.getFileInfo(jobId);
- Assert.assertNotNull("Unable to locate old job history", fileInfo);
- } finally {
+ fileInfo = hfm.getFileInfo(jobId);
+ Assert.assertNotNull("Unable to locate old job history", fileInfo);
+ } finally {
LOG.info("FINISHED testScanningOldDirs");
}
}
static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory {
- public MRAppWithHistoryWithFailedAttempt(int maps, int reduces, boolean autoComplete,
- String testName, boolean cleanOnStart) {
+ public MRAppWithHistoryWithFailedAttempt(int maps, int reduces,
+ boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
-
+
@SuppressWarnings("unchecked")
@Override
protected void attemptLaunched(TaskAttemptId attemptID) {
@@ -558,8 +563,8 @@ public class TestJobHistoryParsing {
static class MRAppWithHistoryWithFailedTask extends MRAppWithHistory {
- public MRAppWithHistoryWithFailedTask(int maps, int reduces, boolean autoComplete,
- String testName, boolean cleanOnStart) {
+ public MRAppWithHistoryWithFailedTask(int maps, int reduces,
+ boolean autoComplete, String testName, boolean cleanOnStart) {
super(maps, reduces, autoComplete, testName, cleanOnStart);
}
@@ -587,4 +592,133 @@ public class TestJobHistoryParsing {
t.testHistoryParsing();
t.testHistoryParsingForFailedAttempts();
}
+
+ /**
+ * Test clean old history files. Files should be deleted after 1 week by
+ * default.
+ */
+ @Test(timeout = 15000)
+ public void testDeleteFileInfo() throws Exception {
+ LOG.info("STARTING testDeleteFileInfo");
+ try {
+ Configuration conf = new Configuration();
+
+ conf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+
+ RackResolver.init(conf);
+ MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
+ true);
+ app.submit(conf);
+ Job job = app.getContext().getAllJobs().values().iterator().next();
+ JobId jobId = job.getID();
+
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ // make sure all events are flushed
+ app.waitForState(Service.STATE.STOPPED);
+
+ HistoryFileManager hfm = new HistoryFileManager();
+ hfm.init(conf);
+ HistoryFileInfo fileInfo = hfm.getFileInfo(jobId);
+ hfm.initExisting();
+ // wait for move files form the done_intermediate directory to the gone
+ // directory
+ while (fileInfo.isMovePending()) {
+ Thread.sleep(300);
+ }
+
+ Assert.assertNotNull(hfm.jobListCache.values());
+
+ // try to remove fileInfo
+ hfm.clean();
+ // check that fileInfo does not deleted
+ Assert.assertFalse(fileInfo.isDeleted());
+ // correct live time
+ hfm.setMaxHistoryAge(-1);
+ hfm.clean();
+ // should be deleted !
+ Assert.assertTrue("file should be deleted ", fileInfo.isDeleted());
+
+ } finally {
+ LOG.info("FINISHED testDeleteFileInfo");
+ }
+ }
+
+ /**
+ * Simple test some methods of JobHistory
+ */
+ @Test(timeout = 20000)
+ public void testJobHistoryMethods() throws Exception {
+ LOG.info("STARTING testJobHistoryMethods");
+ try {
+ Configuration configuration = new Configuration();
+ configuration
+ .setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+
+ RackResolver.init(configuration);
+ MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
+ true);
+ app.submit(configuration);
+ Job job = app.getContext().getAllJobs().values().iterator().next();
+ JobId jobId = job.getID();
+ LOG.info("JOBID is " + TypeConverter.fromYarn(jobId).toString());
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ JobHistory jobHistory = new JobHistory();
+ jobHistory.init(configuration);
+ // Method getAllJobs
+ Assert.assertEquals(1, jobHistory.getAllJobs().size());
+ // and with ApplicationId
+ Assert.assertEquals(1, jobHistory.getAllJobs(app.getAppID()).size());
+
+ JobsInfo jobsinfo = jobHistory.getPartialJobs(0L, 10L, null, "default",
+ 0L, System.currentTimeMillis() + 1, 0L,
+ System.currentTimeMillis() + 1, JobState.SUCCEEDED);
+
+ Assert.assertEquals(1, jobsinfo.getJobs().size());
+ Assert.assertNotNull(jobHistory.getApplicationAttemptId());
+ // test Application Id
+ Assert.assertEquals("application_0_0000", jobHistory.getApplicationID()
+ .toString());
+ Assert
+ .assertEquals("Job History Server", jobHistory.getApplicationName());
+ // method does not work
+ Assert.assertNull(jobHistory.getEventHandler());
+ // method does not work
+ Assert.assertNull(jobHistory.getClock());
+ // method does not work
+ Assert.assertNull(jobHistory.getClusterInfo());
+
+ } finally {
+ LOG.info("FINISHED testJobHistoryMethods");
+ }
+ }
+
+ /**
+ * Simple test PartialJob
+ */
+ @Test(timeout = 1000)
+ public void testPartialJob() throws Exception {
+ JobId jobId = new JobIdPBImpl();
+ jobId.setId(0);
+ JobIndexInfo jii = new JobIndexInfo(0L, System.currentTimeMillis(), "user",
+ "jobName", jobId, 3, 2, "JobStatus");
+ PartialJob test = new PartialJob(jii, jobId);
+
+ Assert.assertEquals(1.0f, test.getProgress(), 0.001f);
+ assertNull(test.getAllCounters());
+ assertNull(test.getTasks());
+ assertNull(test.getTasks(TaskType.MAP));
+ assertNull(test.getTask(new TaskIdPBImpl()));
+
+ assertNull(test.getTaskAttemptCompletionEvents(0, 100));
+ assertNull(test.getMapAttemptCompletionEvents(0, 100));
+ assertTrue(test.checkAccess(UserGroupInformation.getCurrentUser(), null));
+ assertNull(test.getAMInfos());
+
+ }
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java?rev=1465017&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java Fri Apr 5 15:43:53 2013
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompletionEventsResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory;
+import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryParsing.MyResolver;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/*
+test JobHistoryServer protocols....
+ */
+public class TestJobHistoryServer {
+ private static RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+
+
+ JobHistoryServer historyServer=null;
+ // simple test init/start/stop JobHistoryServer. Status should change.
+
+ @Test (timeout= 50000 )
+ public void testStartStopServer() throws Exception {
+
+ historyServer = new JobHistoryServer();
+ Configuration config = new Configuration();
+ historyServer.init(config);
+ assertEquals(STATE.INITED, historyServer.getServiceState());
+ assertEquals(3, historyServer.getServices().size());
+ historyServer.start();
+ assertEquals(STATE.STARTED, historyServer.getServiceState());
+ historyServer.stop();
+ assertEquals(STATE.STOPPED, historyServer.getServiceState());
+ assertNotNull(historyServer.getClientService());
+ HistoryClientService historyService = historyServer.getClientService();
+ assertNotNull(historyService.getClientHandler().getConnectAddress());
+
+
+
+ }
+
+
+
+ //Test reports of JobHistoryServer. History server should get log files from MRApp and read them
+
+ @Test (timeout= 50000 )
+ public void testReports() throws Exception {
+ Configuration config = new Configuration();
+ config
+ .setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ MyResolver.class, DNSToSwitchMapping.class);
+
+ RackResolver.init(config);
+ MRApp app = new MRAppWithHistory(1, 1, true, this.getClass().getName(),
+ true);
+ app.submit(config);
+ Job job = app.getContext().getAllJobs().values().iterator().next();
+ app.waitForState(job, JobState.SUCCEEDED);
+
+ historyServer = new JobHistoryServer();
+
+ historyServer.init(config);
+ historyServer.start();
+
+ // search JobHistory service
+ JobHistory jobHistory= null;
+ for (Service service : historyServer.getServices() ) {
+ if (service instanceof JobHistory) {
+ jobHistory = (JobHistory) service;
+ }
+ };
+
+ Map<JobId, Job> jobs= jobHistory.getAllJobs();
+
+ assertEquals(1, jobs.size());
+ assertEquals("job_0_0000",jobs.keySet().iterator().next().toString());
+
+
+ Task task = job.getTasks().values().iterator().next();
+ TaskAttempt attempt = task.getAttempts().values().iterator().next();
+
+ HistoryClientService historyService = historyServer.getClientService();
+ MRClientProtocol protocol = historyService.getClientHandler();
+
+ GetTaskAttemptReportRequest gtarRequest = recordFactory
+ .newRecordInstance(GetTaskAttemptReportRequest.class);
+ // test getTaskAttemptReport
+ TaskAttemptId taId = attempt.getID();
+ taId.setTaskId(task.getID());
+ taId.getTaskId().setJobId(job.getID());
+ gtarRequest.setTaskAttemptId(taId);
+ GetTaskAttemptReportResponse response = protocol
+ .getTaskAttemptReport(gtarRequest);
+ assertEquals("container_0_0000_01_000000", response.getTaskAttemptReport()
+ .getContainerId().toString());
+ assertTrue(response.getTaskAttemptReport().getDiagnosticInfo().isEmpty());
+ // counters
+ assertNotNull(response.getTaskAttemptReport().getCounters()
+ .getCounter(TaskCounter.PHYSICAL_MEMORY_BYTES));
+ assertEquals(taId.toString(), response.getTaskAttemptReport()
+ .getTaskAttemptId().toString());
+ // test getTaskReport
+ GetTaskReportRequest request = recordFactory
+ .newRecordInstance(GetTaskReportRequest.class);
+ TaskId taskId = task.getID();
+ taskId.setJobId(job.getID());
+ request.setTaskId(taskId);
+ GetTaskReportResponse reportResponse = protocol.getTaskReport(request);
+ assertEquals("", reportResponse.getTaskReport().getDiagnosticsList()
+ .iterator().next());
+ // progress
+ assertEquals(1.0f, reportResponse.getTaskReport().getProgress(), 0.01);
+ // report has corrected taskId
+ assertEquals(taskId.toString(), reportResponse.getTaskReport().getTaskId()
+ .toString());
+ // Task state should be SUCCEEDED
+ assertEquals(TaskState.SUCCEEDED, reportResponse.getTaskReport()
+ .getTaskState());
+ // test getTaskAttemptCompletionEvents
+ GetTaskAttemptCompletionEventsRequest taskAttemptRequest = recordFactory
+ .newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
+ taskAttemptRequest.setJobId(job.getID());
+ GetTaskAttemptCompletionEventsResponse taskAttemptCompletionEventsResponse = protocol
+ .getTaskAttemptCompletionEvents(taskAttemptRequest);
+ assertEquals(0, taskAttemptCompletionEventsResponse.getCompletionEventCount());
+
+ // test getDiagnostics
+ GetDiagnosticsRequest diagnosticRequest = recordFactory
+ .newRecordInstance(GetDiagnosticsRequest.class);
+ diagnosticRequest.setTaskAttemptId(taId);
+ GetDiagnosticsResponse diagnosticResponse = protocol
+ .getDiagnostics(diagnosticRequest);
+ // it is strange : why one empty string ?
+ assertEquals(1, diagnosticResponse.getDiagnosticsCount());
+ assertEquals("", diagnosticResponse.getDiagnostics(0));
+
+ }
+ // test main method
+ @Test (timeout =60000)
+ public void testMainMethod() throws Exception {
+
+ ExitUtil.disableSystemExit();
+ try {
+ JobHistoryServer.main(new String[0]);
+
+ } catch (ExitUtil.ExitException e) {
+ assertEquals(0,e.status);
+ ExitUtil.resetFirstExitException();
+ fail();
+ }
+ }
+
+ @After
+ public void stop(){
+ if(historyServer !=null && !STATE.STOPPED.equals(historyServer.getServiceState())){
+ historyServer.stop();
+ }
+ }
+}