You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2012/01/31 11:46:30 UTC
svn commit: r1238385 - in /hadoop/common/branches/branch-1: ./
src/test/org/apache/hadoop/tools/rumen/
src/test/tools/data/rumen/small-trace-test/
src/tools/org/apache/hadoop/tools/rumen/
Author: ravigummadi
Date: Tue Jan 31 10:46:30 2012
New Revision: 1238385
URL: http://svn.apache.org/viewvc?rev=1238385&view=rev
Log:
MAPREDUCE-3597. [Rumen] Provide a way to access other info of history file from Rumen tool. (ravigummadi)
Added:
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java
Modified:
hadoop/common/branches/branch-1/CHANGES.txt
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
hadoop/common/branches/branch-1/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
hadoop/common/branches/branch-1/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
hadoop/common/branches/branch-1/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java
hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1238385&r1=1238384&r2=1238385&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Jan 31 10:46:30 2012
@@ -91,6 +91,9 @@ Release 1.1.0 - unreleased
IMPROVEMENTS
+ MAPREDUCE-3597. [Rumen] Provide a way to access other info of history file
+ from Rumen tool. (ravigummadi)
+
MAPREDUCE-2517. Add system tests to Gridmix. (Vinay Thota via amarrk)
MAPREDUCE-3008. [Gridmix] Improve cumulative CPU usage emulation for
Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java?rev=1238385&r1=1238384&r2=1238385&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java (original)
+++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/tools/rumen/TestRumenJobTraces.java Tue Jan 31 10:46:30 2012
@@ -29,6 +29,8 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
@@ -56,6 +58,8 @@ import org.junit.Test;
import static org.junit.Assert.*;
public class TestRumenJobTraces {
+ private static final Log LOG = LogFactory.getLog(TestRumenJobTraces.class);
+
@Test
public void testSmallTrace() throws Exception {
performSingleTest("sample-job-tracker-logs.gz",
@@ -204,6 +208,140 @@ public class TestRumenJobTraces {
goldPath, LoggedJob.class, "trace");
}
+ /**
+ * Verify if the obtainXXX methods of {@link ParsedJob}, {@link ParsedTask}
+ * and {@link ParsedTaskAttempt} give valid info
+ */
+ private void validateParsedJob(ParsedJob parsedJob, int numMaps,
+ int numReduces, String queueName) {
+ validateParsedJobAPI(parsedJob, numMaps, numReduces, queueName);
+
+ List<ParsedTask> maps = parsedJob.obtainMapTasks();
+ for (ParsedTask task : maps) {
+ validateParsedTask(task);
+ }
+ List<ParsedTask> reduces = parsedJob.obtainReduceTasks();
+ for (ParsedTask task : reduces) {
+ validateParsedTask(task);
+ }
+ List<ParsedTask> others = parsedJob.obtainOtherTasks();
+ for (ParsedTask task : others) {
+ validateParsedTask(task);
+ }
+ }
+
+ /** Verify if the obtainXXX methods of {@link ParsedJob} give valid info */
+ private void validateParsedJobAPI(ParsedJob parsedJob, int numMaps,
+ int numReduces, String queueName) {
+ LOG.info("Validating ParsedJob.obtainXXX api... for "
+ + parsedJob.getJobID());
+ assertNotNull("Job acls in ParsedJob is null",
+ parsedJob.obtainJobAcls());
+ assertNotNull("Job conf path in ParsedJob is null",
+ parsedJob.obtainJobConfpath());
+ assertEquals("Job queue in ParsedJob is wrong",
+ queueName, parsedJob.getQueue());
+
+ assertNotNull("Map Counters in ParsedJob is null",
+ parsedJob.obtainMapCounters());
+ assertNotNull("Reduce Counters in ParsedJob is null",
+ parsedJob.obtainReduceCounters());
+ assertNotNull("Total Counters in ParsedJob is null",
+ parsedJob.obtainTotalCounters());
+
+ assertNotNull("Map Tasks List in ParsedJob is null",
+ parsedJob.obtainMapTasks());
+ assertNotNull("Reduce Tasks List in ParsedJob is null",
+ parsedJob.obtainReduceTasks());
+ assertNotNull("Other Tasks List in ParsedJob is null",
+ parsedJob.obtainOtherTasks());
+
+ // 1 map and 1 reduce task should be there
+ assertEquals("Number of map tasks in ParsedJob is wrong",
+ numMaps, parsedJob.obtainMapTasks().size());
+ assertEquals("Number of reduce tasks in ParsedJob is wrong",
+ numReduces, parsedJob.obtainReduceTasks().size(), 1);
+
+ assertTrue("Total Counters in ParsedJob is empty",
+ parsedJob.obtainTotalCounters().size() > 0);
+ // Current 0.20 history files contain job-level-map-counters and
+ // job-level-reduce-counters. Older 0.20 history files may not have them.
+ assertTrue("Map Counters in ParsedJob is empty",
+ parsedJob.obtainMapCounters().size() > 0);
+ assertTrue("Reduce Counters in ParsedJob is empty",
+ parsedJob.obtainReduceCounters().size() > 0);
+ }
+
+ /**
+ * Verify if the obtainXXX methods of {@link ParsedTask} and
+ * {@link ParsedTaskAttempt} give valid info
+ */
+ private void validateParsedTask(ParsedTask parsedTask) {
+ validateParsedTaskAPI(parsedTask);
+
+ List<ParsedTaskAttempt> attempts = parsedTask.obtainTaskAttempts();
+ for (ParsedTaskAttempt attempt : attempts) {
+ validateParsedTaskAttemptAPI(attempt);
+ }
+ }
+
+ /** Verify if the obtainXXX methods of {@link ParsedTask} give valid info */
+ private void validateParsedTaskAPI(ParsedTask parsedTask) {
+ LOG.info("Validating ParsedTask.obtainXXX api... for "
+ + parsedTask.getTaskID());
+ assertNotNull("Task counters in ParsedTask is null",
+ parsedTask.obtainCounters());
+
+ if (parsedTask.getTaskStatus()
+ == Pre21JobHistoryConstants.Values.SUCCESS) {
+ // task counters should not be empty
+ assertTrue("Task counters in ParsedTask is empty",
+ parsedTask.obtainCounters().size() > 0);
+ assertNull("Diagnostic-info is non-null for a succeeded task",
+ parsedTask.obtainDiagnosticInfo());
+ assertNull("Failed-due-to-attemptId is non-null for a succeeded task",
+ parsedTask.obtainFailedDueToAttemptId());
+ } else {
+ assertNotNull("Diagnostic-info is non-null for a succeeded task",
+ parsedTask.obtainDiagnosticInfo());
+ assertNotNull("Failed-due-to-attemptId is non-null for a succeeded task",
+ parsedTask.obtainFailedDueToAttemptId());
+ }
+
+ List<ParsedTaskAttempt> attempts = parsedTask.obtainTaskAttempts();
+ assertNotNull("TaskAttempts list in ParsedTask is null", attempts);
+ assertTrue("TaskAttempts list in ParsedTask is empty",
+ attempts.size() > 0);
+ }
+
+ /**
+ * Verify if the obtainXXX methods of {@link ParsedTaskAttempt} give
+ * valid info
+ */
+ private void validateParsedTaskAttemptAPI(
+ ParsedTaskAttempt parsedTaskAttempt) {
+ LOG.info("Validating ParsedTaskAttempt.obtainXXX api... for "
+ + parsedTaskAttempt.getAttemptID());
+ assertNotNull("Counters in ParsedTaskAttempt is null",
+ parsedTaskAttempt.obtainCounters());
+
+ if (parsedTaskAttempt.getResult()
+ == Pre21JobHistoryConstants.Values.SUCCESS) {
+ assertTrue("Counters in ParsedTaskAttempt is empty",
+ parsedTaskAttempt.obtainCounters().size() > 0);
+ assertNull("Diagnostic-info is non-null for a succeeded taskAttempt",
+ parsedTaskAttempt.obtainDiagnosticInfo());
+ } else {
+ assertNotNull("Diagnostic-info is non-null for a succeeded taskAttempt",
+ parsedTaskAttempt.obtainDiagnosticInfo());
+ }
+ assertNotNull("TrackerName in ParsedTaskAttempt is null",
+ parsedTaskAttempt.obtainTrackerName());
+
+ assertNotNull("http-port info in ParsedTaskAttempt is null",
+ parsedTaskAttempt.obtainHttpPort());
+ }
+
@Test
public void testHadoop20JHParser() throws Exception {
// Disabled
@@ -533,10 +671,12 @@ public class TestRumenJobTraces {
final Path tempDir = new Path(rootTempDir, "TestCurrentJHParser");
lfs.delete(tempDir, true);
+ String queueName = "testQueue";
// Run a MR job
// create a MR cluster
conf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
+ conf.set("mapred.queue.names", queueName);
MiniMRCluster mrCluster = new MiniMRCluster(1, "file:///", 1, null, null,
new JobConf(conf));
@@ -549,8 +689,10 @@ public class TestRumenJobTraces {
RunningJob rJob = null;
try {
+ JobConf jobConf = mrCluster.createJobConf();
+ jobConf.setQueueName(queueName);
// construct a job with 1 map and 1 reduce task.
- rJob = UtilsForTests.runJob(mrCluster.createJobConf(), inDir, outDir, 1,
+ rJob = UtilsForTests.runJob(jobConf, inDir, outDir, 1,
1);
rJob.waitForCompletion();
assertTrue("Job failed", rJob.isSuccessful());
@@ -583,12 +725,24 @@ public class TestRumenJobTraces {
// Test if the JobHistoryParserFactory can detect the parser correctly
parser = JobHistoryParserFactory.getParser(ris);
+ // Get ParsedJob
+ String jobId = TraceBuilder.extractJobID(filePair.first());
+ JobBuilder builder = new JobBuilder(jobId);
+
HistoryEvent e;
while ((e = parser.nextEvent()) != null) {
String eventString = e.getEventType().toString();
System.out.println(eventString);
seenEvents.add(eventString);
+ if (builder != null) {
+ builder.process(e);
+ }
}
+
+ ParsedJob parsedJob = builder.build();
+ // validate the obtainXXX api of ParsedJob, ParsedTask and
+ // ParsedTaskAttempt.
+ validateParsedJob(parsedJob, 1, 1, queueName);
} finally {
// stop the MR cluster
mrCluster.shutdown();
Modified: hadoop/common/branches/branch-1/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/tools/data/rumen/small-trace-test/counters-test-trace.json.gz?rev=1238385&r1=1238384&r2=1238385&view=diff
==============================================================================
Binary files - no diff available.
Modified: hadoop/common/branches/branch-1/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/tools/data/rumen/small-trace-test/dispatch-sample-v20-jt-log.gz?rev=1238385&r1=1238384&r2=1238385&view=diff
==============================================================================
Binary files - no diff available.
Modified: hadoop/common/branches/branch-1/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/tools/data/rumen/small-trace-test/dispatch-trace-output.json.gz?rev=1238385&r1=1238384&r2=1238385&view=diff
==============================================================================
Binary files - no diff available.
Modified: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=1238385&r1=1238384&r2=1238385&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (original)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Tue Jan 31 10:46:30 2012
@@ -65,6 +65,7 @@ public class Job20LineHistoryEventEmitte
String jobConf = line.get("JOBCONF");
String user = line.get("USER");
String jobName = line.get("JOBNAME");
+ String queueName = line.get("JOB_QUEUE");
if (submitTime != null) {
Job20LineHistoryEventEmitter that =
@@ -75,7 +76,7 @@ public class Job20LineHistoryEventEmitte
Map<JobACL, AccessControlList> jobACLs =
new HashMap<JobACL, AccessControlList>();
return new JobSubmittedEvent(jobID, jobName, user == null ? "nulluser"
- : user, that.originalSubmitTime, jobConf, jobACLs);
+ : user, that.originalSubmitTime, jobConf, jobACLs, queueName);
}
return null;
@@ -213,6 +214,8 @@ public class Job20LineHistoryEventEmitte
String failedMaps = line.get("FAILED_MAPS");
String failedReduces = line.get("FAILED_REDUCES");
+ String mapCounters = line.get("MAP_COUNTERS");
+ String reduceCounters = line.get("REDUCE_COUNTERS");
String counters = line.get("COUNTERS");
if (status != null && status.equalsIgnoreCase("success")
@@ -220,7 +223,8 @@ public class Job20LineHistoryEventEmitte
&& finishedReduces != null) {
return new JobFinishedEvent(jobID, Long.parseLong(finishTime), Integer
.parseInt(finishedMaps), Integer.parseInt(finishedReduces), Integer
- .parseInt(failedMaps), Integer.parseInt(failedReduces), null, null,
+ .parseInt(failedMaps), Integer.parseInt(failedReduces),
+ maybeParseCounters(mapCounters), maybeParseCounters(reduceCounters),
maybeParseCounters(counters));
}
Modified: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=1238385&r1=1238384&r2=1238385&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (original)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Tue Jan 31 10:46:30 2012
@@ -42,16 +42,16 @@ public class JobBuilder {
private boolean finalized = false;
- private LoggedJob result = new LoggedJob();
+ private ParsedJob result = new ParsedJob();
- private Map<String, LoggedTask> mapTasks = new HashMap<String, LoggedTask>();
- private Map<String, LoggedTask> reduceTasks =
- new HashMap<String, LoggedTask>();
- private Map<String, LoggedTask> otherTasks =
- new HashMap<String, LoggedTask>();
+ private Map<String, ParsedTask> mapTasks = new HashMap<String, ParsedTask>();
+ private Map<String, ParsedTask> reduceTasks =
+ new HashMap<String, ParsedTask>();
+ private Map<String, ParsedTask> otherTasks =
+ new HashMap<String, ParsedTask>();
- private Map<String, LoggedTaskAttempt> attempts =
- new HashMap<String, LoggedTaskAttempt>();
+ private Map<String, ParsedTaskAttempt> attempts =
+ new HashMap<String, ParsedTaskAttempt>();
private Map<ParsedHost, ParsedHost> allHosts =
new HashMap<ParsedHost, ParsedHost>();
@@ -103,7 +103,7 @@ public class JobBuilder {
public void process(HistoryEvent event) {
if (finalized) {
throw new IllegalStateException(
- "JobBuilder.process(HistoryEvent event) called after LoggedJob built");
+ "JobBuilder.process(HistoryEvent event) called after ParsedJob built");
}
// these are in lexicographical order by class name.
@@ -205,12 +205,16 @@ public class JobBuilder {
public void process(Properties conf) {
if (finalized) {
throw new IllegalStateException(
- "JobBuilder.process(Properties conf) called after LoggedJob built");
+ "JobBuilder.process(Properties conf) called after ParsedJob built");
}
//TODO remove this once the deprecate APIs in LoggedJob are removed
- result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES
- .getCandidates(), "default"));
+ String queue = extract(conf, JobConfPropertyNames.QUEUE_NAMES
+ .getCandidates(), null);
+ // set the queue name if existing
+ if (queue != null) {
+ result.setQueue(queue);
+ }
result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES
.getCandidates(), null));
@@ -228,9 +232,9 @@ public class JobBuilder {
* Request the builder to build the final object. Once called, the
* {@link JobBuilder} would accept no more events or job-conf properties.
*
- * @return Parsed {@link LoggedJob} object.
+ * @return Parsed {@link ParsedJob} object.
*/
- public LoggedJob build() {
+ public ParsedJob build() {
// The main job here is to build CDFs and manage the conf
finalized = true;
@@ -394,7 +398,7 @@ public class JobBuilder {
}
private void processTaskUpdatedEvent(TaskUpdatedEvent event) {
- LoggedTask task = getTask(event.getTaskId().toString());
+ ParsedTask task = getTask(event.getTaskId().toString());
if (task == null) {
return;
}
@@ -402,7 +406,7 @@ public class JobBuilder {
}
private void processTaskStartedEvent(TaskStartedEvent event) {
- LoggedTask task =
+ ParsedTask task =
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true);
task.setStartTime(event.getStartTime());
task.setPreferredLocations(preferredLocationForSplits(event
@@ -410,7 +414,7 @@ public class JobBuilder {
}
private void processTaskFinishedEvent(TaskFinishedEvent event) {
- LoggedTask task =
+ ParsedTask task =
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
if (task == null) {
return;
@@ -421,18 +425,21 @@ public class JobBuilder {
}
private void processTaskFailedEvent(TaskFailedEvent event) {
- LoggedTask task =
+ ParsedTask task =
getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false);
if (task == null) {
return;
}
task.setFinishTime(event.getFinishTime());
task.setTaskStatus(getPre21Value(event.getTaskStatus()));
+ task.putDiagnosticInfo(event.getError());
+ task.putFailedDueToAttemptId(event.getFailedAttemptID().toString());
+ // No counters in TaskFailedEvent
}
private void processTaskAttemptUnsuccessfulCompletionEvent(
TaskAttemptUnsuccessfulCompletionEvent event) {
- LoggedTaskAttempt attempt =
+ ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getTaskAttemptId().toString());
@@ -448,20 +455,24 @@ public class JobBuilder {
}
attempt.setFinishTime(event.getFinishTime());
+ attempt.putDiagnosticInfo(event.getError());
+ // No counters in TaskAttemptUnsuccessfulCompletionEvent
}
private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
- LoggedTaskAttempt attempt =
+ ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getTaskAttemptId().toString());
if (attempt == null) {
return;
}
attempt.setStartTime(event.getStartTime());
+ attempt.putTrackerName(event.getTrackerName());
+ attempt.putHttpPort(event.getHttpPort());
}
private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) {
- LoggedTaskAttempt attempt =
+ ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getAttemptId().toString());
if (attempt == null) {
@@ -477,7 +488,7 @@ public class JobBuilder {
private void processReduceAttemptFinishedEvent(
ReduceAttemptFinishedEvent event) {
- LoggedTaskAttempt attempt =
+ ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getAttemptId().toString());
if (attempt == null) {
@@ -496,7 +507,7 @@ public class JobBuilder {
}
private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
- LoggedTaskAttempt attempt =
+ ParsedTaskAttempt attempt =
getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(),
event.getAttemptId().toString());
if (attempt == null) {
@@ -517,6 +528,7 @@ public class JobBuilder {
result.setOutcome(Pre21JobHistoryConstants.Values
.valueOf(event.getStatus()));
result.setFinishTime(event.getFinishTime());
+ // No counters in JobUnsuccessfulCompletionEvent
}
private void processJobSubmittedEvent(JobSubmittedEvent event) {
@@ -524,6 +536,13 @@ public class JobBuilder {
result.setJobName(event.getJobName());
result.setUser(event.getUserName());
result.setSubmitTime(event.getSubmitTime());
+ result.putJobConfPath(event.getJobConfPath());
+
+ String queue = event.getJobQueueName();
+ if (queue != null) {
+ result.setQueue(queue);
+ }
+ result.putJobAcls(event.getJobAcls());
}
private void processJobStatusChangedEvent(JobStatusChangedEvent event) {
@@ -550,10 +569,20 @@ public class JobBuilder {
result.setFinishTime(event.getFinishTime());
result.setJobID(jobID);
result.setOutcome(Values.SUCCESS);
+
+ Map<String, Long> countersMap = JobHistoryUtils.extractCounters(
+ new JhCounters(event.getTotalCounters(), "COUNTERS"));
+ result.putTotalCounters(countersMap);
+ countersMap = JobHistoryUtils.extractCounters(new JhCounters(
+ event.getMapCounters(), "MAP_COUNTERS"));
+ result.putMapCounters(countersMap);
+ countersMap = JobHistoryUtils.extractCounters(new JhCounters(
+ event.getReduceCounters(), "REDUCE_COUNTERS"));
+ result.putReduceCounters(countersMap);
}
- private LoggedTask getTask(String taskIDname) {
- LoggedTask result = mapTasks.get(taskIDname);
+ private ParsedTask getTask(String taskIDname) {
+ ParsedTask result = mapTasks.get(taskIDname);
if (result != null) {
return result;
@@ -577,9 +606,9 @@ public class JobBuilder {
* if true, we can create a task.
* @return
*/
- private LoggedTask getOrMakeTask(TaskType type, String taskIDname,
+ private ParsedTask getOrMakeTask(TaskType type, String taskIDname,
boolean allowCreate) {
- Map<String, LoggedTask> taskMap = otherTasks;
+ Map<String, ParsedTask> taskMap = otherTasks;
List<LoggedTask> tasks = this.result.getOtherTasks();
switch (type) {
@@ -597,10 +626,10 @@ public class JobBuilder {
// no code
}
- LoggedTask result = taskMap.get(taskIDname);
+ ParsedTask result = taskMap.get(taskIDname);
if (result == null && allowCreate) {
- result = new LoggedTask();
+ result = new ParsedTask();
result.setTaskType(getPre21Value(type.toString()));
result.setTaskID(taskIDname);
taskMap.put(taskIDname, result);
@@ -610,13 +639,13 @@ public class JobBuilder {
return result;
}
- private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type,
+ private ParsedTaskAttempt getOrMakeTaskAttempt(TaskType type,
String taskIDName, String taskAttemptName) {
- LoggedTask task = getOrMakeTask(type, taskIDName, false);
- LoggedTaskAttempt result = attempts.get(taskAttemptName);
+ ParsedTask task = getOrMakeTask(type, taskIDName, false);
+ ParsedTaskAttempt result = attempts.get(taskAttemptName);
if (result == null && task != null) {
- result = new LoggedTaskAttempt();
+ result = new ParsedTaskAttempt();
result.setAttemptID(taskAttemptName);
attempts.put(taskAttemptName, result);
task.getAttempts().add(result);
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java?rev=1238385&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobHistoryUtils.java Tue Jan 31 10:46:30 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JobHistoryUtils {
+
+ /**
+ * Extract/Add counters into the Map from the given JhCounters object.
+ * @param counters the counters to be extracted from
+ * @return the map of counters
+ */
+ static Map<String, Long> extractCounters(JhCounters counters) {
+ Map<String, Long> countersMap = new HashMap<String, Long>();
+ if (counters != null) {
+ for (JhCounterGroup group : counters.groups) {
+ for (JhCounter counter : group.counts) {
+ countersMap.put(counter.name.toString(), counter.value);
+ }
+ }
+ }
+ return countersMap;
+ }
+}
Modified: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java?rev=1238385&r1=1238384&r2=1238385&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java (original)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/JobSubmittedEvent.java Tue Jan 31 10:46:30 2012
@@ -37,17 +37,32 @@ public class JobSubmittedEvent implement
private long submitTime;
private String jobConfPath;
private Map<JobACL, AccessControlList> jobAcls;
+ private String queue;
/**
* @deprecated Use
- * {@link #JobSubmittedEvent(JobID, String, String, long, String, Map)}
+ * {@link #JobSubmittedEvent(JobID, String, String, long, String,
+ * Map, String)}
* instead.
*/
@Deprecated
public JobSubmittedEvent(JobID id, String jobName, String userName,
long submitTime, String jobConfPath) {
this(id, jobName, userName, submitTime, jobConfPath,
- new HashMap<JobACL, AccessControlList>());
+ new HashMap<JobACL, AccessControlList>(), null);
+ }
+
+ /**
+ * @deprecated Use
+ * {@link #JobSubmittedEvent(JobID, String, String, long, String,
+ * Map, String)}
+ * instead.
+ */
+ @Deprecated
+ public JobSubmittedEvent(JobID id, String jobName, String userName,
+ long submitTime, String jobConfPath,
+ Map<JobACL, AccessControlList> jobACLs) {
+ this(id, jobName, userName, submitTime, jobConfPath, jobACLs, null);
}
/**
@@ -58,16 +73,18 @@ public class JobSubmittedEvent implement
* @param submitTime Time of submission
* @param jobConfPath Path of the Job Configuration file
* @param jobACLs The configured acls for the job.
+ * @param queue job queue name
*/
public JobSubmittedEvent(JobID id, String jobName, String userName,
long submitTime, String jobConfPath,
- Map<JobACL, AccessControlList> jobACLs) {
+ Map<JobACL, AccessControlList> jobACLs, String queue) {
this.jobId = id;
this.jobName = jobName;
this.userName = userName;
this.submitTime = submitTime;
this.jobConfPath = jobConfPath;
this.jobAcls = jobACLs;
+ this.queue = queue;
}
/** Get the Job Id */
@@ -84,7 +101,11 @@ public class JobSubmittedEvent implement
public Map<JobACL, AccessControlList> getJobAcls() {
return jobAcls;
}
-
+
+ public String getJobQueueName() {
+ return queue;
+ }
+
/** Get the event type */
public EventType getEventType() { return EventType.JOB_SUBMITTED; }
Modified: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=1238385&r1=1238384&r2=1238385&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Tue Jan 31 10:46:30 2012
@@ -359,6 +359,10 @@ public class LoggedJob implements DeepCo
this.relativeTime = relativeTime;
}
+ /**
+ * @return job queue name if it is available in job history file or
+ * job history conf file. Returns null otherwise.
+ */
public String getQueue() {
return queue;
}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java?rev=1238385&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedJob.java Tue Jan 31 10:46:30 2012
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.security.authorize.AccessControlList;
+
+/**
+ * This is a wrapper class around {@link LoggedJob}. This provides also the
+ * extra information about the job obtained from job history which is not
+ * written to the JSON trace file.
+ */
+public class ParsedJob extends LoggedJob {
+
+ private static final Log LOG = LogFactory.getLog(ParsedJob.class);
+
+ private Map<String, Long> totalCountersMap = new HashMap<String, Long>();
+ private Map<String, Long> mapCountersMap = new HashMap<String, Long>();
+ private Map<String, Long> reduceCountersMap = new HashMap<String, Long>();
+
+ private String jobConfPath;
+ private Map<JobACL, AccessControlList> jobAcls;
+
+ ParsedJob() {
+
+ }
+
+ ParsedJob(String jobID) {
+ super();
+
+ setJobID(jobID);
+ }
+
+ /** Set the job total counters */
+ void putTotalCounters(Map<String, Long> totalCounters) {
+ this.totalCountersMap = totalCounters;
+ }
+
+ /**
+ * @return the job total counters
+ */
+ public Map<String, Long> obtainTotalCounters() {
+ return totalCountersMap;
+ }
+
+ /** Set the job level map tasks' counters */
+ void putMapCounters(Map<String, Long> mapCounters) {
+ this.mapCountersMap = mapCounters;
+ }
+
+ /**
+ * @return the job level map tasks' counters
+ */
+ public Map<String, Long> obtainMapCounters() {
+ return mapCountersMap;
+ }
+
+ /** Set the job level reduce tasks' counters */
+ void putReduceCounters(Map<String, Long> reduceCounters) {
+ this.reduceCountersMap = reduceCounters;
+ }
+
+ /**
+ * @return the job level reduce tasks' counters
+ */
+ public Map<String, Long> obtainReduceCounters() {
+ return reduceCountersMap;
+ }
+
+ /** Set the job conf path in staging dir on hdfs */
+ void putJobConfPath(String confPath) {
+ jobConfPath = confPath;
+ }
+
+ /**
+ * @return the job conf path in staging dir on hdfs
+ */
+ public String obtainJobConfpath() {
+ return jobConfPath;
+ }
+
+ /** Set the job acls */
+ void putJobAcls(Map<JobACL, AccessControlList> acls) {
+ jobAcls = acls;
+ }
+
+ /**
+ * @return the job acls
+ */
+ public Map<JobACL, AccessControlList> obtainJobAcls() {
+ return jobAcls;
+ }
+
+ /**
+ * @return the list of map tasks of this job
+ */
+ public List<ParsedTask> obtainMapTasks() {
+ List<LoggedTask> tasks = super.getMapTasks();
+ return convertTasks(tasks);
+ }
+
+ /**
+ * @return the list of reduce tasks of this job
+ */
+ public List<ParsedTask> obtainReduceTasks() {
+ List<LoggedTask> tasks = super.getReduceTasks();
+ return convertTasks(tasks);
+ }
+
+ /**
+ * @return the list of other tasks of this job
+ */
+ public List<ParsedTask> obtainOtherTasks() {
+ List<LoggedTask> tasks = super.getOtherTasks();
+ return convertTasks(tasks);
+ }
+
+ /** As we know that this list of {@link LoggedTask} objects is actually a list
+ * of {@link ParsedTask} objects, we go ahead and cast them.
+ * @return the list of {@link ParsedTask} objects
+ */
+ private List<ParsedTask> convertTasks(List<LoggedTask> tasks) {
+ List<ParsedTask> result = new ArrayList<ParsedTask>();
+
+ for (LoggedTask t : tasks) {
+ if (t instanceof ParsedTask) {
+ result.add((ParsedTask)t);
+ } else {
+ throw new RuntimeException("Unexpected type of tasks in the list...");
+ }
+ }
+ return result;
+ }
+
+ /** Dump the extra info of ParsedJob */
+ void dumpParsedJob() {
+ LOG.info("ParsedJob details:" + obtainTotalCounters() + ";"
+ + obtainMapCounters() + ";" + obtainReduceCounters()
+ + "\n" + obtainJobConfpath() + "\n" + obtainJobAcls()
+ + ";Q=" + (getQueue() == null ? "null" : getQueue()));
+ List<ParsedTask> maps = obtainMapTasks();
+ for (ParsedTask task : maps) {
+ task.dumpParsedTask();
+ }
+ List<ParsedTask> reduces = obtainReduceTasks();
+ for (ParsedTask task : reduces) {
+ task.dumpParsedTask();
+ }
+ List<ParsedTask> others = obtainOtherTasks();
+ for (ParsedTask task : others) {
+ task.dumpParsedTask();
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java?rev=1238385&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTask.java Tue Jan 31 10:46:30 2012
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This is a wrapper class around {@link LoggedTask}. This provides also the
+ * extra information about the task obtained from job history which is not
+ * written to the JSON trace file.
+ */
+public class ParsedTask extends LoggedTask {
+
+ private static final Log LOG = LogFactory.getLog(ParsedTask.class);
+
+ private String diagnosticInfo;
+ private String failedDueToAttempt;
+ private Map<String, Long> countersMap = new HashMap<String, Long>();
+
+ ParsedTask() {
+ super();
+ }
+
+ public void incorporateCounters(JhCounters counters) {
+ Map<String, Long> countersMap =
+ JobHistoryUtils.extractCounters(counters);
+ putCounters(countersMap);
+
+ super.incorporateCounters(counters);
+ }
+
+ /** Set the task counters */
+ public void putCounters(Map<String, Long> counters) {
+ this.countersMap = counters;
+ }
+
+ /**
+ * @return the task counters
+ */
+ public Map<String, Long> obtainCounters() {
+ return countersMap;
+ }
+
+ /** Set the task diagnostic-info */
+ public void putDiagnosticInfo(String msg) {
+ diagnosticInfo = msg;
+ }
+
+ /**
+ * @return the diagnostic-info of this task.
+ * If the task is successful, returns null.
+ */
+ public String obtainDiagnosticInfo() {
+ return diagnosticInfo;
+ }
+
+ /**
+ * Set the failed-due-to-attemptId info of this task.
+ */
+ public void putFailedDueToAttemptId(String attempt) {
+ failedDueToAttempt = attempt;
+ }
+
+ /**
+ * @return the failed-due-to-attemptId info of this task.
+ * If the task is successful, returns null.
+ */
+ public String obtainFailedDueToAttemptId() {
+ return failedDueToAttempt;
+ }
+
+ public List<ParsedTaskAttempt> obtainTaskAttempts() {
+ List<LoggedTaskAttempt> attempts = getAttempts();
+ return convertTaskAttempts(attempts);
+ }
+
+ List<ParsedTaskAttempt> convertTaskAttempts(
+ List<LoggedTaskAttempt> attempts) {
+ List<ParsedTaskAttempt> result = new ArrayList<ParsedTaskAttempt>();
+
+ for (LoggedTaskAttempt t : attempts) {
+ if (t instanceof ParsedTaskAttempt) {
+ result.add((ParsedTaskAttempt)t);
+ } else {
+ throw new RuntimeException(
+ "Unexpected type of taskAttempts in the list...");
+ }
+ }
+ return result;
+ }
+
+ /** Dump the extra info of ParsedTask */
+ void dumpParsedTask() {
+ LOG.info("ParsedTask details:" + obtainCounters()
+ + "\n" + obtainFailedDueToAttemptId()
+ + "\nPreferred Locations are:");
+ List<LoggedLocation> loc = getPreferredLocations();
+ for (LoggedLocation l : loc) {
+ LOG.info(l.getLayers() + ";" + l.toString());
+ }
+ List<ParsedTaskAttempt> attempts = obtainTaskAttempts();
+ for (ParsedTaskAttempt attempt : attempts) {
+ attempt.dumpParsedTaskAttempt();
+ }
+ }
+}
Added: hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java?rev=1238385&view=auto
==============================================================================
--- hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java (added)
+++ hadoop/common/branches/branch-1/src/tools/org/apache/hadoop/tools/rumen/ParsedTaskAttempt.java Tue Jan 31 10:46:30 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.rumen;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This is a wrapper class around {@link LoggedTaskAttempt}. This provides
+ * also the extra information about the task attempt obtained from
+ * job history which is not written to the JSON trace file.
+ */
+public class ParsedTaskAttempt extends LoggedTaskAttempt {
+
+ private static final Log LOG = LogFactory.getLog(ParsedTaskAttempt.class);
+
+ private String diagnosticInfo;
+ private String stateString;
+ private String trackerName;
+ private Integer httpPort;
+ private Map<String, Long> countersMap = new HashMap<String, Long>();
+
+ ParsedTaskAttempt() {
+ super();
+ }
+
+ /** incorporate event counters */
+ public void incorporateCounters(JhCounters counters) {
+
+ Map<String, Long> countersMap =
+ JobHistoryUtils.extractCounters(counters);
+ putCounters(countersMap);
+
+ super.incorporateCounters(counters);
+ }
+
+ /** Set the task attempt counters */
+ public void putCounters(Map<String, Long> counters) {
+ this.countersMap = counters;
+ }
+
+ /**
+ * @return the task attempt counters
+ */
+ public Map<String, Long> obtainCounters() {
+ return countersMap;
+ }
+
+ /** Set the task attempt diagnostic-info */
+ public void putDiagnosticInfo(String msg) {
+ diagnosticInfo = msg;
+ }
+
+ /**
+ * @return the diagnostic-info of this task attempt.
+ * If the attempt is successful, returns null.
+ */
+ public String obtainDiagnosticInfo() {
+ return diagnosticInfo;
+ }
+
+ void putTrackerName(String trackerName) {
+ this.trackerName = trackerName;
+ }
+
+ /**
+ * @return the tracker name where the attempt was run.
+ */
+ public String obtainTrackerName() {
+ return trackerName;
+ }
+
+ void putHttpPort(int port) {
+ httpPort = port;
+ }
+
+ /**
+ * @return http port if set. Returns null otherwise.
+ */
+ public Integer obtainHttpPort() {
+ return httpPort;
+ }
+
+ void putStateString(String state) {
+ stateString = state;
+ }
+
+ /**
+ * @return state string of the task attempt.
+ */
+ public String obtainStateString() {
+ return stateString;
+ }
+
+ /** Dump the extra info of ParsedTaskAttempt */
+ void dumpParsedTaskAttempt() {
+ LOG.info("ParsedTaskAttempt details:" + obtainCounters()
+ + ";DiagnosticInfo=" + obtainDiagnosticInfo() + "\n"
+ + obtainTrackerName() + ";" + obtainHttpPort()
+ + ";host=" + getHostName());
+ }
+}