You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [8/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoop...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,12 +18,12 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -31,386 +31,375 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class JobLogHistoryProcessor extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
-
- private static final String recordType = "JobLogHistory";
- private static String internalRegex = null;
- private static Pattern ip = null;
-
- private Matcher internalMatcher = null;
-
- public JobLogHistoryProcessor()
- {
- internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
- ip = Pattern.compile(internalRegex);
- internalMatcher = ip.matcher("-");
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
-
-// log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
-// + chunk.getDataType() + "]");
-
- try
- {
-
- HashMap<String, String> keys = new HashMap<String, String>();
- ChukwaRecord record = null;
-
- int firstSep = recordEntry.indexOf(" ");
- keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep));
-// log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
-// + keys.get("RECORD_TYPE") + "]");
-
- String body = recordEntry.substring(firstSep);
-
- internalMatcher.reset(body);
-
-// String fieldName = null;
-// String fieldValue = null;
-
- while (internalMatcher.matches())
- {
-
- keys.put(internalMatcher.group(1).trim(), internalMatcher
- .group(2).trim());
-
- // TODO Remove debug info before production
-// fieldName = internalMatcher.group(1).trim();
-// fieldValue = internalMatcher.group(2).trim();
-// log.info("JobLogHistoryProcessor Add field: [" + fieldName +
-// "][" + fieldValue +"]" );
-// log.info("EOL : [" + internalMatcher.group(3) + "]" );
- internalMatcher.reset(internalMatcher.group(3));
- }
-
- if (!keys.containsKey("JOBID"))
- {
- // Extract JobID from taskID
- // JOBID = "job_200804210403_0005"
- // TASKID = "tip_200804210403_0005_m_000018"
- String jobId = keys.get("TASKID");
- int idx1 = jobId.indexOf('_',0);
- int idx2 = jobId.indexOf('_', idx1+1);
- idx2 = jobId.indexOf('_', idx2+1);
- keys.put("JOBID",jobId.substring(idx1+1,idx2));
-// log.info("JobLogHistoryProcessor Add field: [JOBID]["
-// + keys.get("JOBID") + "]");
- }
- else
- {
- String jobId = keys.get("JOBID").replace("_", "").substring(3);
- keys.put("JOBID",jobId);
- }
- // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
- // keys.containsKey("SUBMIT_TIME"))
- // {
- // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
- // USER="userxxx"
- // // SUBMIT_TIME="1208760436751"
- // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
- //
- //
- // }
- // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
- // keys.containsKey("LAUNCH_TIME"))
- // {
- // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
- // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
- //
- // }
- // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
- // keys.containsKey("FINISH_TIME"))
- // {
- // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
- // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
- // FAILED_MAPS="0" FAILED_REDUCES="0"
- // // COUNTERS="File Systems.Local bytes read:1735053407244,File
- // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
- // read:801605644910,File Systems.HDFS bytes written:44135800,
- // // Job Counters .Launched map tasks:5912,Job Counters .Launched
- // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
- // Counters .Rack-local map tasks:316,Map-Reduce Framework.
- // // Map input records:9410696067,Map-Reduce Framework.Map output
- // records:9410696067,Map-Reduce Framework.Map input
- // bytes:801599188816,Map-Reduce Framework.Map output
- // bytes:784427968116,
- // // Map-Reduce Framework.Combine input records:0,Map-Reduce
- // Framework.Combine output records:0,Map-Reduce Framework.Reduce
- // input groups:477265,Map-Reduce Framework.Reduce input
- // records:739000,
- // // Map-Reduce Framework.Reduce output records:739000"
- //
- // }
- // else
- if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
- && keys.containsKey("START_TIME"))
- {
- // MapAttempt TASK_TYPE="MAP"
- // TASKID="tip_200804210403_0005_m_000018"
- // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
- // START_TIME="1208760437531"
- // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("START_TIME")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("START_TIME", keys.get("START_TIME"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/Map/S");
- output.collect(key, record);
-
- } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
- && keys.containsKey("FINISH_TIME"))
- {
- // MapAttempt TASK_TYPE="MAP"
- // TASKID="tip_200804210403_0005_m_005494"
- // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
- // TASK_STATUS="SUCCESS"
- // FINISH_TIME="1208760624124"
- // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("FINISH_TIME", keys.get("FINISH_TIME"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/Map/E");
- output.collect(key, record);
- }
-
- else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
- && keys.containsKey("START_TIME"))
- {
- // ReduceAttempt TASK_TYPE="REDUCE"
- // TASKID="tip_200804210403_0005_r_000138"
- // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
- // START_TIME="1208760454885"
- // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("START_TIME")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("START_TIME", keys.get("START_TIME"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/SHUFFLE/S");
- output.collect(key, record);
-
- } else if (keys.get("RECORD_TYPE")
- .equalsIgnoreCase("ReduceAttempt")
- && keys.containsKey("FINISH_TIME"))
- {
- // ReduceAttempt TASK_TYPE="REDUCE"
- // TASKID="tip_200804210403_0005_r_000138"
- // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
- // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
- // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
- // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/SHUFFLE/E");
- output.collect(key, record);
-
- // SORT
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/SORT/S");
- output.collect(key, record);
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/SORT/E");
- output.collect(key, record);
-
- // Reduce
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("START_TIME", keys.get("SORT_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/REDUCE/S");
- output.collect(key, record);
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/REDUCE/E");
- output.collect(key, record);
-
- } else if ( keys.get("RECORD_TYPE").equalsIgnoreCase("Job") )
- {
- // 1
- // Job JOBID="job_200809062051_0001" JOBNAME="wordcount" USER="xxx"
- // SUBMIT_TIME="1208760906812"
- // JOBCONF="/user/xxx/mapredsystem/563976.yyy.zzz.com/job_200809062051_0001/job.xml"
-
- // 2
- // Job JOBID="job_200809062051_0001" LAUNCH_TIME="1208760906816" TOTAL_MAPS="3" TOTAL_REDUCES="7"
-
- // 3
- // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906826"
- // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
- // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
- // COUNTERS="File Systems.Local bytes read:1735053407244,File
- // Systems.Local bytes written:2610106384012,File Systems.HDFS
- // bytes read:801605644910,File Systems.HDFS bytes
- // written:44135800,
- // Job Counters .Launched map tasks:5912,Job Counters .Launched
- // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
- // Counters .Rack-local map tasks:316,Map-Reduce Framework.
- // Map input records:9410696067,Map-Reduce Framework.Map output
- // records:9410696067,Map-Reduce Framework.Map input
- // bytes:801599188816,Map-Reduce Framework.Map output
- // bytes:784427968116,
- // Map-Reduce Framework.Combine input records:0,Map-Reduce
- // Framework.Combine output records:0,Map-Reduce
- // Framework.Reduce input groups:477265,Map-Reduce
- // Framework.Reduce input records:739000,
- // Map-Reduce Framework.Reduce output records:739000"
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "MRJob");
- if (keys.containsKey("COUNTERS"))
- {
- extractCounters(record, keys.get("COUNTERS"));
- }
-
- key = new ChukwaRecordKey();
- key.setKey("MRJob/" + keys.get("JOBID") );
- key.setReduceType("MRJobReduceProcessor");
-
- record = new ChukwaRecord();
- record.add(Record.tagsField, chunk.getTags());
- if (keys.containsKey("SUBMIT_TIME"))
- { record.setTime(Long.parseLong(keys.get("SUBMIT_TIME"))); }
- else if (keys.containsKey("LAUNCH_TIME"))
- { record.setTime(Long.parseLong(keys.get("LAUNCH_TIME"))); }
- else if (keys.containsKey("FINISH_TIME"))
- { record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); }
-
- Iterator<String> it = keys.keySet().iterator();
- while(it.hasNext())
- {
- String field = it.next();
- record.add(field, keys.get(field));
- }
-
-
- output.collect(key, record);
- }
-
- if (keys.containsKey("TASK_TYPE")
- && keys.containsKey("COUNTERS")
- && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys
- .get("TASK_TYPE").equalsIgnoreCase("MAP")))
- {
- // MAP
- // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
- // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
- // COUNTERS="File Systems.Local bytes read:159265655,File
- // Systems.Local bytes written:318531310,
- // File Systems.HDFS bytes read:145882417,Map-Reduce
- // Framework.Map input records:1706604,
- // Map-Reduce Framework.Map output records:1706604,Map-Reduce
- // Framework.Map input bytes:145882057,
- // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
- // Framework.Combine input records:0,Map-Reduce
- // Framework.Combine output records:0"
-
- // REDUCE
- // Task TASKID="tip_200804210403_0005_r_000524"
- // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
- // FINISH_TIME="1208760877072"
- // COUNTERS="File Systems.Local bytes read:1179319677,File
- // Systems.Local bytes written:1184474889,File Systems.HDFS
- // bytes written:59021,
- // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
- // Framework.Reduce input records:1000,Map-Reduce
- // Framework.Reduce output records:1000"
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
- extractCounters(record, keys.get("COUNTERS"));
- record.add("JOBID",keys.get("JOBID"));
- record.add("TASKID", keys.get("TASKID"));
- record.add("TASK_TYPE", keys.get("TASK_TYPE"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("MR_Graph +1");
- output.collect(key, record);
-
- }
- }
- catch (IOException e)
- {
- log.warn("Unable to collect output in JobLogHistoryProcessor ["
- + recordEntry + "]", e);
- e.printStackTrace();
- throw e;
- }
-
- }
-
- protected void extractCounters(ChukwaRecord record, String input)
- {
-
- String[] data = null;
- String[] counters = input.split(",");
-
- for (String counter : counters)
- {
- data = counter.split(":");
- record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
- .toUpperCase(), data[1]);
- }
- }
-
- public String getDataType()
- {
- return JobLogHistoryProcessor.recordType;
- }
+public class JobLogHistoryProcessor extends AbstractProcessor {
+ static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
+
+ private static final String recordType = "JobLogHistory";
+ private static String internalRegex = null;
+ private static Pattern ip = null;
+
+ private Matcher internalMatcher = null;
+
+ public JobLogHistoryProcessor() {
+ internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
+ ip = Pattern.compile(internalRegex);
+ internalMatcher = ip.matcher("-");
+ }
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+
+ // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
+ // + chunk.getDataType() + "]");
+
+ try {
+
+ HashMap<String, String> keys = new HashMap<String, String>();
+ ChukwaRecord record = null;
+
+ int firstSep = recordEntry.indexOf(" ");
+ keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep));
+ // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
+ // + keys.get("RECORD_TYPE") + "]");
+
+ String body = recordEntry.substring(firstSep);
+
+ internalMatcher.reset(body);
+
+ // String fieldName = null;
+ // String fieldValue = null;
+
+ while (internalMatcher.matches()) {
+
+ keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
+ .trim());
+
+ // TODO Remove debug info before production
+ // fieldName = internalMatcher.group(1).trim();
+ // fieldValue = internalMatcher.group(2).trim();
+ // log.info("JobLogHistoryProcessor Add field: [" + fieldName +
+ // "][" + fieldValue +"]" );
+ // log.info("EOL : [" + internalMatcher.group(3) + "]" );
+ internalMatcher.reset(internalMatcher.group(3));
+ }
+
+ if (!keys.containsKey("JOBID")) {
+ // Extract JobID from taskID
+ // JOBID = "job_200804210403_0005"
+ // TASKID = "tip_200804210403_0005_m_000018"
+ String jobId = keys.get("TASKID");
+ int idx1 = jobId.indexOf('_', 0);
+ int idx2 = jobId.indexOf('_', idx1 + 1);
+ idx2 = jobId.indexOf('_', idx2 + 1);
+ keys.put("JOBID", jobId.substring(idx1 + 1, idx2));
+ // log.info("JobLogHistoryProcessor Add field: [JOBID]["
+ // + keys.get("JOBID") + "]");
+ } else {
+ String jobId = keys.get("JOBID").replace("_", "").substring(3);
+ keys.put("JOBID", jobId);
+ }
+ // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+ // keys.containsKey("SUBMIT_TIME"))
+ // {
+ // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
+ // USER="userxxx"
+ // // SUBMIT_TIME="1208760436751"
+ // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
+ //
+ //
+ // }
+ // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+ // keys.containsKey("LAUNCH_TIME"))
+ // {
+ // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
+ // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
+ //
+ // }
+ // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+ // keys.containsKey("FINISH_TIME"))
+ // {
+ // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
+ // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
+ // FAILED_MAPS="0" FAILED_REDUCES="0"
+ // // COUNTERS="File Systems.Local bytes read:1735053407244,File
+ // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
+ // read:801605644910,File Systems.HDFS bytes written:44135800,
+ // // Job Counters .Launched map tasks:5912,Job Counters .Launched
+ // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+ // Counters .Rack-local map tasks:316,Map-Reduce Framework.
+ // // Map input records:9410696067,Map-Reduce Framework.Map output
+ // records:9410696067,Map-Reduce Framework.Map input
+ // bytes:801599188816,Map-Reduce Framework.Map output
+ // bytes:784427968116,
+ // // Map-Reduce Framework.Combine input records:0,Map-Reduce
+ // Framework.Combine output records:0,Map-Reduce Framework.Reduce
+ // input groups:477265,Map-Reduce Framework.Reduce input
+ // records:739000,
+ // // Map-Reduce Framework.Reduce output records:739000"
+ //
+ // }
+ // else
+ if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+ && keys.containsKey("START_TIME")) {
+ // MapAttempt TASK_TYPE="MAP"
+ // TASKID="tip_200804210403_0005_m_000018"
+ // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
+ // START_TIME="1208760437531"
+ // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
+ + keys.get("START_TIME"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("START_TIME")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("START_TIME", keys.get("START_TIME"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/Map/S");
+ output.collect(key, record);
+
+ } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+ && keys.containsKey("FINISH_TIME")) {
+ // MapAttempt TASK_TYPE="MAP"
+ // TASKID="tip_200804210403_0005_m_005494"
+ // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
+ // TASK_STATUS="SUCCESS"
+ // FINISH_TIME="1208760624124"
+ // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
+ + keys.get("FINISH_TIME"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("FINISH_TIME", keys.get("FINISH_TIME"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/Map/E");
+ output.collect(key, record);
+ }
+
+ else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+ && keys.containsKey("START_TIME")) {
+ // ReduceAttempt TASK_TYPE="REDUCE"
+ // TASKID="tip_200804210403_0005_r_000138"
+ // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+ // START_TIME="1208760454885"
+ // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
+ + keys.get("START_TIME"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("START_TIME")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("START_TIME", keys.get("START_TIME"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/SHUFFLE/S");
+ output.collect(key, record);
+
+ } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+ && keys.containsKey("FINISH_TIME")) {
+ // ReduceAttempt TASK_TYPE="REDUCE"
+ // TASKID="tip_200804210403_0005_r_000138"
+ // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+ // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
+ // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
+ // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
+ + keys.get("SHUFFLE_FINISHED"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/SHUFFLE/E");
+ output.collect(key, record);
+
+ // SORT
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
+ + keys.get("SHUFFLE_FINISHED"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/SORT/S");
+ output.collect(key, record);
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
+ + keys.get("SORT_FINISHED"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/SORT/E");
+ output.collect(key, record);
+
+ // Reduce
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
+ + keys.get("SORT_FINISHED"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("START_TIME", keys.get("SORT_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/REDUCE/S");
+ output.collect(key, record);
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
+ + keys.get("FINISH_TIME"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/REDUCE/E");
+ output.collect(key, record);
+
+ } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")) {
+ // 1
+ // Job JOBID="job_200809062051_0001" JOBNAME="wordcount" USER="xxx"
+ // SUBMIT_TIME="1208760906812"
+ // JOBCONF=
+ // "/user/xxx/mapredsystem/563976.yyy.zzz.com/job_200809062051_0001/job.xml"
+
+ // 2
+ // Job JOBID="job_200809062051_0001" LAUNCH_TIME="1208760906816"
+ // TOTAL_MAPS="3" TOTAL_REDUCES="7"
+
+ // 3
+ // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906826"
+ // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
+ // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
+ // COUNTERS="File Systems.Local bytes read:1735053407244,File
+ // Systems.Local bytes written:2610106384012,File Systems.HDFS
+ // bytes read:801605644910,File Systems.HDFS bytes
+ // written:44135800,
+ // Job Counters .Launched map tasks:5912,Job Counters .Launched
+ // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+ // Counters .Rack-local map tasks:316,Map-Reduce Framework.
+ // Map input records:9410696067,Map-Reduce Framework.Map output
+ // records:9410696067,Map-Reduce Framework.Map input
+ // bytes:801599188816,Map-Reduce Framework.Map output
+ // bytes:784427968116,
+ // Map-Reduce Framework.Combine input records:0,Map-Reduce
+ // Framework.Combine output records:0,Map-Reduce
+ // Framework.Reduce input groups:477265,Map-Reduce
+ // Framework.Reduce input records:739000,
+ // Map-Reduce Framework.Reduce output records:739000"
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ buildGenericRecord(record, null, Long
+ .parseLong(keys.get("FINISH_TIME")), "MRJob");
+ if (keys.containsKey("COUNTERS")) {
+ extractCounters(record, keys.get("COUNTERS"));
+ }
+
+ key = new ChukwaRecordKey();
+ key.setKey("MRJob/" + keys.get("JOBID"));
+ key.setReduceType("MRJobReduceProcessor");
+
+ record = new ChukwaRecord();
+ record.add(Record.tagsField, chunk.getTags());
+ if (keys.containsKey("SUBMIT_TIME")) {
+ record.setTime(Long.parseLong(keys.get("SUBMIT_TIME")));
+ } else if (keys.containsKey("LAUNCH_TIME")) {
+ record.setTime(Long.parseLong(keys.get("LAUNCH_TIME")));
+ } else if (keys.containsKey("FINISH_TIME")) {
+ record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
+ }
+
+ Iterator<String> it = keys.keySet().iterator();
+ while (it.hasNext()) {
+ String field = it.next();
+ record.add(field, keys.get(field));
+ }
+
+ output.collect(key, record);
+ }
+
+ if (keys.containsKey("TASK_TYPE")
+ && keys.containsKey("COUNTERS")
+ && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
+ "TASK_TYPE").equalsIgnoreCase("MAP"))) {
+ // MAP
+ // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
+ // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
+ // COUNTERS="File Systems.Local bytes read:159265655,File
+ // Systems.Local bytes written:318531310,
+ // File Systems.HDFS bytes read:145882417,Map-Reduce
+ // Framework.Map input records:1706604,
+ // Map-Reduce Framework.Map output records:1706604,Map-Reduce
+ // Framework.Map input bytes:145882057,
+ // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
+ // Framework.Combine input records:0,Map-Reduce
+ // Framework.Combine output records:0"
+
+ // REDUCE
+ // Task TASKID="tip_200804210403_0005_r_000524"
+ // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
+ // FINISH_TIME="1208760877072"
+ // COUNTERS="File Systems.Local bytes read:1179319677,File
+ // Systems.Local bytes written:1184474889,File Systems.HDFS
+ // bytes written:59021,
+ // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
+ // Framework.Reduce input records:1000,Map-Reduce
+ // Framework.Reduce output records:1000"
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ buildGenericRecord(record, null, Long
+ .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
+ extractCounters(record, keys.get("COUNTERS"));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("TASKID", keys.get("TASKID"));
+ record.add("TASK_TYPE", keys.get("TASK_TYPE"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("MR_Graph +1");
+ output.collect(key, record);
+
+ }
+ } catch (IOException e) {
+ log.warn("Unable to collect output in JobLogHistoryProcessor ["
+ + recordEntry + "]", e);
+ e.printStackTrace();
+ throw e;
+ }
+
+ }
+
+ protected void extractCounters(ChukwaRecord record, String input) {
+
+ String[] data = null;
+ String[] counters = input.split(",");
+
+ for (String counter : counters) {
+ data = counter.split(":");
+ record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
+ .toUpperCase(), data[1]);
+ }
+ }
+
+ public String getDataType() {
+ return JobLogHistoryProcessor.recordType;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,11 +18,11 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.util.Hashtable;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -30,367 +30,359 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class Log4jJobHistoryProcessor extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
-
- private static final String recordType = "JobLogHistory";
- private static String internalRegex = null;
- private static Pattern ip = null;
-
- private Matcher internalMatcher = null;
-
- public Log4jJobHistoryProcessor()
- {
- internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
- ip = Pattern.compile(internalRegex);
- internalMatcher = ip.matcher("-");
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
-
-// log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
-// + chunk.getDataType() + "]");
-
- try
- {
-
- //String dStr = recordEntry.substring(0, 23);
- int start = 24;
- int idx = recordEntry.indexOf(' ', start);
- // String level = recordEntry.substring(start, idx);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- // String className = recordEntry.substring(start, idx-1);
- String body = recordEntry.substring(idx + 1);
-
- Hashtable<String, String> keys = new Hashtable<String, String>();
- ChukwaRecord record = null;
-
- int firstSep = body.indexOf(" ");
- keys.put("RECORD_TYPE", body.substring(0, firstSep));
-// log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
-// + keys.get("RECORD_TYPE") + "]");
-
- body = body.substring(firstSep);
-
- internalMatcher.reset(body);
-
-// String fieldName = null;
-// String fieldValue = null;
-
- while (internalMatcher.matches())
- {
-
- keys.put(internalMatcher.group(1).trim(), internalMatcher
- .group(2).trim());
-
- // TODO Remove debug info before production
-// fieldName = internalMatcher.group(1).trim();
-// fieldValue = internalMatcher.group(2).trim();
-// log.info("JobLogHistoryProcessor Add field: [" + fieldName +
-// "][" + fieldValue +"]" );
-// log.info("EOL : [" + internalMatcher.group(3) + "]" );
- internalMatcher.reset(internalMatcher.group(3));
- }
-
- if (!keys.containsKey("JOBID"))
- {
- // Extract JobID from taskID
- // JOBID = "job_200804210403_0005"
- // TASKID = "tip_200804210403_0005_m_000018"
- String jobId = keys.get("TASKID");
- int idx1 = jobId.indexOf('_',0);
- int idx2 = jobId.indexOf('_', idx1+1);
- idx2 = jobId.indexOf('_', idx2+1);
- keys.put("JOBID","job" + jobId.substring(idx1,idx2));
-// log.info("JobLogHistoryProcessor Add field: [JOBID]["
-// + keys.get("JOBID") + "]");
- }
-
- // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
- // keys.containsKey("SUBMIT_TIME"))
- // {
- // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
- // USER="userxxx"
- // // SUBMIT_TIME="1208760436751"
- // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
- //
- //
- // }
- // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
- // keys.containsKey("LAUNCH_TIME"))
- // {
- // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
- // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
- //
- // }
- // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
- // keys.containsKey("FINISH_TIME"))
- // {
- // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
- // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
- // FAILED_MAPS="0" FAILED_REDUCES="0"
- // // COUNTERS="File Systems.Local bytes read:1735053407244,File
- // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
- // read:801605644910,File Systems.HDFS bytes written:44135800,
- // // Job Counters .Launched map tasks:5912,Job Counters .Launched
- // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
- // Counters .Rack-local map tasks:316,Map-Reduce Framework.
- // // Map input records:9410696067,Map-Reduce Framework.Map output
- // records:9410696067,Map-Reduce Framework.Map input
- // bytes:801599188816,Map-Reduce Framework.Map output
- // bytes:784427968116,
- // // Map-Reduce Framework.Combine input records:0,Map-Reduce
- // Framework.Combine output records:0,Map-Reduce Framework.Reduce
- // input groups:477265,Map-Reduce Framework.Reduce input
- // records:739000,
- // // Map-Reduce Framework.Reduce output records:739000"
- //
- // }
- // else
- if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
- && keys.containsKey("START_TIME"))
- {
- // MapAttempt TASK_TYPE="MAP"
- // TASKID="tip_200804210403_0005_m_000018"
- // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
- // START_TIME="1208760437531"
- // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("START_TIME")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("START_TIME", keys.get("START_TIME"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/Map/S");
- output.collect(key, record);
-
- } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
- && keys.containsKey("FINISH_TIME"))
- {
- // MapAttempt TASK_TYPE="MAP"
- // TASKID="tip_200804210403_0005_m_005494"
- // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
- // TASK_STATUS="SUCCESS"
- // FINISH_TIME="1208760624124"
- // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("FINISH_TIME", keys.get("FINISH_TIME"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/Map/E");
- output.collect(key, record);
- }
-
- else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
- && keys.containsKey("START_TIME"))
- {
- // ReduceAttempt TASK_TYPE="REDUCE"
- // TASKID="tip_200804210403_0005_r_000138"
- // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
- // START_TIME="1208760454885"
- // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("START_TIME")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("START_TIME", keys.get("START_TIME"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/SHUFFLE/S");
- output.collect(key, record);
-
- } else if (keys.get("RECORD_TYPE")
- .equalsIgnoreCase("ReduceAttempt")
- && keys.containsKey("FINISH_TIME"))
- {
- // ReduceAttempt TASK_TYPE="REDUCE"
- // TASKID="tip_200804210403_0005_r_000138"
- // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
- // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
- // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
- // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/SHUFFLE/E");
- output.collect(key, record);
-
- // SORT
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/SORT/S");
- output.collect(key, record);
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/SORT/E");
- output.collect(key, record);
-
- // Reduce
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("START_TIME", keys.get("SORT_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/REDUCE/S");
- output.collect(key, record);
-
- key = new ChukwaRecordKey();
- key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
- key.setReduceType("JobLogHistoryReduceProcessor");
- record = new ChukwaRecord();
- record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
- record.add("JOBID",keys.get("JOBID"));
- record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
- record.add(Record.tagsField, chunk.getTags());
-// log.info("JobLogHist/REDUCE/E");
- output.collect(key, record);
-
- } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
- && keys.containsKey("COUNTERS"))
- {
- // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
- // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
- // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
- // COUNTERS="File Systems.Local bytes read:1735053407244,File
- // Systems.Local bytes written:2610106384012,File Systems.HDFS
- // bytes read:801605644910,File Systems.HDFS bytes
- // written:44135800,
- // Job Counters .Launched map tasks:5912,Job Counters .Launched
- // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
- // Counters .Rack-local map tasks:316,Map-Reduce Framework.
- // Map input records:9410696067,Map-Reduce Framework.Map output
- // records:9410696067,Map-Reduce Framework.Map input
- // bytes:801599188816,Map-Reduce Framework.Map output
- // bytes:784427968116,
- // Map-Reduce Framework.Combine input records:0,Map-Reduce
- // Framework.Combine output records:0,Map-Reduce
- // Framework.Reduce input groups:477265,Map-Reduce
- // Framework.Reduce input records:739000,
- // Map-Reduce Framework.Reduce output records:739000"
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
- extractCounters(record, keys.get("COUNTERS"));
-
- String jobId = keys.get("JOBID").replace("_", "").substring(3);
- record.add("JobId", "" + jobId);
-
- // FIXME validate this when HodId will be available
- if (keys.containsKey("HODID"))
- { record.add("HodId", keys.get("HODID")); }
-
-// log.info("MRJobCounters +1");
- output.collect(key, record);
- }
-
- if (keys.containsKey("TASK_TYPE")
- && keys.containsKey("COUNTERS")
- && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys
- .get("TASK_TYPE").equalsIgnoreCase("MAP")))
- {
- // MAP
- // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
- // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
- // COUNTERS="File Systems.Local bytes read:159265655,File
- // Systems.Local bytes written:318531310,
- // File Systems.HDFS bytes read:145882417,Map-Reduce
- // Framework.Map input records:1706604,
- // Map-Reduce Framework.Map output records:1706604,Map-Reduce
- // Framework.Map input bytes:145882057,
- // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
- // Framework.Combine input records:0,Map-Reduce
- // Framework.Combine output records:0"
-
- // REDUCE
- // Task TASKID="tip_200804210403_0005_r_000524"
- // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
- // FINISH_TIME="1208760877072"
- // COUNTERS="File Systems.Local bytes read:1179319677,File
- // Systems.Local bytes written:1184474889,File Systems.HDFS
- // bytes written:59021,
- // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
- // Framework.Reduce input records:1000,Map-Reduce
- // Framework.Reduce output records:1000"
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
- extractCounters(record, keys.get("COUNTERS"));
- record.add("JOBID",keys.get("JOBID"));
- record.add("TASKID", keys.get("TASKID"));
- record.add("TASK_TYPE", keys.get("TASK_TYPE"));
-
-// log.info("MR_Graph +1");
- output.collect(key, record);
-
- }
- }
- catch (IOException e)
- {
- log.warn("Unable to collect output in JobLogHistoryProcessor ["
- + recordEntry + "]", e);
- e.printStackTrace();
- throw e;
- }
-
- }
-
- protected void extractCounters(ChukwaRecord record, String input)
- {
-
- String[] data = null;
- String[] counters = input.split(",");
-
- for (String counter : counters)
- {
- data = counter.split(":");
- record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
- .toUpperCase(), data[1]);
- }
- }
-
- public String getDataType()
- {
- return Log4jJobHistoryProcessor.recordType;
- }
+public class Log4jJobHistoryProcessor extends AbstractProcessor {
+ static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
+
+ private static final String recordType = "JobLogHistory";
+ private static String internalRegex = null;
+ private static Pattern ip = null;
+
+ private Matcher internalMatcher = null;
+
+ public Log4jJobHistoryProcessor() {
+ internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
+ ip = Pattern.compile(internalRegex);
+ internalMatcher = ip.matcher("-");
+ }
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+
+ // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
+ // + chunk.getDataType() + "]");
+
+ try {
+
+ // String dStr = recordEntry.substring(0, 23);
+ int start = 24;
+ int idx = recordEntry.indexOf(' ', start);
+ // String level = recordEntry.substring(start, idx);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ // String className = recordEntry.substring(start, idx-1);
+ String body = recordEntry.substring(idx + 1);
+
+ Hashtable<String, String> keys = new Hashtable<String, String>();
+ ChukwaRecord record = null;
+
+ int firstSep = body.indexOf(" ");
+ keys.put("RECORD_TYPE", body.substring(0, firstSep));
+ // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
+ // + keys.get("RECORD_TYPE") + "]");
+
+ body = body.substring(firstSep);
+
+ internalMatcher.reset(body);
+
+ // String fieldName = null;
+ // String fieldValue = null;
+
+ while (internalMatcher.matches()) {
+
+ keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
+ .trim());
+
+ // TODO Remove debug info before production
+ // fieldName = internalMatcher.group(1).trim();
+ // fieldValue = internalMatcher.group(2).trim();
+ // log.info("JobLogHistoryProcessor Add field: [" + fieldName +
+ // "][" + fieldValue +"]" );
+ // log.info("EOL : [" + internalMatcher.group(3) + "]" );
+ internalMatcher.reset(internalMatcher.group(3));
+ }
+
+ if (!keys.containsKey("JOBID")) {
+ // Extract JobID from taskID
+ // JOBID = "job_200804210403_0005"
+ // TASKID = "tip_200804210403_0005_m_000018"
+ String jobId = keys.get("TASKID");
+ int idx1 = jobId.indexOf('_', 0);
+ int idx2 = jobId.indexOf('_', idx1 + 1);
+ idx2 = jobId.indexOf('_', idx2 + 1);
+ keys.put("JOBID", "job" + jobId.substring(idx1, idx2));
+ // log.info("JobLogHistoryProcessor Add field: [JOBID]["
+ // + keys.get("JOBID") + "]");
+ }
+
+ // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+ // keys.containsKey("SUBMIT_TIME"))
+ // {
+ // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
+ // USER="userxxx"
+ // // SUBMIT_TIME="1208760436751"
+ // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
+ //
+ //
+ // }
+ // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+ // keys.containsKey("LAUNCH_TIME"))
+ // {
+ // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
+ // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
+ //
+ // }
+ // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+ // keys.containsKey("FINISH_TIME"))
+ // {
+ // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
+ // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
+ // FAILED_MAPS="0" FAILED_REDUCES="0"
+ // // COUNTERS="File Systems.Local bytes read:1735053407244,File
+ // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
+ // read:801605644910,File Systems.HDFS bytes written:44135800,
+ // // Job Counters .Launched map tasks:5912,Job Counters .Launched
+ // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+ // Counters .Rack-local map tasks:316,Map-Reduce Framework.
+ // // Map input records:9410696067,Map-Reduce Framework.Map output
+ // records:9410696067,Map-Reduce Framework.Map input
+ // bytes:801599188816,Map-Reduce Framework.Map output
+ // bytes:784427968116,
+ // // Map-Reduce Framework.Combine input records:0,Map-Reduce
+ // Framework.Combine output records:0,Map-Reduce Framework.Reduce
+ // input groups:477265,Map-Reduce Framework.Reduce input
+ // records:739000,
+ // // Map-Reduce Framework.Reduce output records:739000"
+ //
+ // }
+ // else
+ if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+ && keys.containsKey("START_TIME")) {
+ // MapAttempt TASK_TYPE="MAP"
+ // TASKID="tip_200804210403_0005_m_000018"
+ // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
+ // START_TIME="1208760437531"
+ // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
+ + keys.get("START_TIME"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("START_TIME")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("START_TIME", keys.get("START_TIME"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/Map/S");
+ output.collect(key, record);
+
+ } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+ && keys.containsKey("FINISH_TIME")) {
+ // MapAttempt TASK_TYPE="MAP"
+ // TASKID="tip_200804210403_0005_m_005494"
+ // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
+ // TASK_STATUS="SUCCESS"
+ // FINISH_TIME="1208760624124"
+ // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
+ + keys.get("FINISH_TIME"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("FINISH_TIME", keys.get("FINISH_TIME"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/Map/E");
+ output.collect(key, record);
+ }
+
+ else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+ && keys.containsKey("START_TIME")) {
+ // ReduceAttempt TASK_TYPE="REDUCE"
+ // TASKID="tip_200804210403_0005_r_000138"
+ // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+ // START_TIME="1208760454885"
+ // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
+ + keys.get("START_TIME"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("START_TIME")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("START_TIME", keys.get("START_TIME"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/SHUFFLE/S");
+ output.collect(key, record);
+
+ } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+ && keys.containsKey("FINISH_TIME")) {
+ // ReduceAttempt TASK_TYPE="REDUCE"
+ // TASKID="tip_200804210403_0005_r_000138"
+ // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+ // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
+ // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
+ // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
+ + keys.get("SHUFFLE_FINISHED"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/SHUFFLE/E");
+ output.collect(key, record);
+
+ // SORT
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
+ + keys.get("SHUFFLE_FINISHED"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/SORT/S");
+ output.collect(key, record);
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
+ + keys.get("SORT_FINISHED"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/SORT/E");
+ output.collect(key, record);
+
+ // Reduce
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
+ + keys.get("SORT_FINISHED"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("START_TIME", keys.get("SORT_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/REDUCE/S");
+ output.collect(key, record);
+
+ key = new ChukwaRecordKey();
+ key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
+ + keys.get("FINISH_TIME"));
+ key.setReduceType("JobLogHistoryReduceProcessor");
+ record = new ChukwaRecord();
+ record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
+ record.add(Record.tagsField, chunk.getTags());
+ // log.info("JobLogHist/REDUCE/E");
+ output.collect(key, record);
+
+ } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
+ && keys.containsKey("COUNTERS")) {
+ // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
+ // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
+ // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
+ // COUNTERS="File Systems.Local bytes read:1735053407244,File
+ // Systems.Local bytes written:2610106384012,File Systems.HDFS
+ // bytes read:801605644910,File Systems.HDFS bytes
+ // written:44135800,
+ // Job Counters .Launched map tasks:5912,Job Counters .Launched
+ // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+ // Counters .Rack-local map tasks:316,Map-Reduce Framework.
+ // Map input records:9410696067,Map-Reduce Framework.Map output
+ // records:9410696067,Map-Reduce Framework.Map input
+ // bytes:801599188816,Map-Reduce Framework.Map output
+ // bytes:784427968116,
+ // Map-Reduce Framework.Combine input records:0,Map-Reduce
+ // Framework.Combine output records:0,Map-Reduce
+ // Framework.Reduce input groups:477265,Map-Reduce
+ // Framework.Reduce input records:739000,
+ // Map-Reduce Framework.Reduce output records:739000"
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ buildGenericRecord(record, null, Long
+ .parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
+ extractCounters(record, keys.get("COUNTERS"));
+
+ String jobId = keys.get("JOBID").replace("_", "").substring(3);
+ record.add("JobId", "" + jobId);
+
+ // FIXME validate this when HodId will be available
+ if (keys.containsKey("HODID")) {
+ record.add("HodId", keys.get("HODID"));
+ }
+
+ // log.info("MRJobCounters +1");
+ output.collect(key, record);
+ }
+
+ if (keys.containsKey("TASK_TYPE")
+ && keys.containsKey("COUNTERS")
+ && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
+ "TASK_TYPE").equalsIgnoreCase("MAP"))) {
+ // MAP
+ // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
+ // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
+ // COUNTERS="File Systems.Local bytes read:159265655,File
+ // Systems.Local bytes written:318531310,
+ // File Systems.HDFS bytes read:145882417,Map-Reduce
+ // Framework.Map input records:1706604,
+ // Map-Reduce Framework.Map output records:1706604,Map-Reduce
+ // Framework.Map input bytes:145882057,
+ // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
+ // Framework.Combine input records:0,Map-Reduce
+ // Framework.Combine output records:0"
+
+ // REDUCE
+ // Task TASKID="tip_200804210403_0005_r_000524"
+ // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
+ // FINISH_TIME="1208760877072"
+ // COUNTERS="File Systems.Local bytes read:1179319677,File
+ // Systems.Local bytes written:1184474889,File Systems.HDFS
+ // bytes written:59021,
+ // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
+ // Framework.Reduce input records:1000,Map-Reduce
+ // Framework.Reduce output records:1000"
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ buildGenericRecord(record, null, Long
+ .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
+ extractCounters(record, keys.get("COUNTERS"));
+ record.add("JOBID", keys.get("JOBID"));
+ record.add("TASKID", keys.get("TASKID"));
+ record.add("TASK_TYPE", keys.get("TASK_TYPE"));
+
+ // log.info("MR_Graph +1");
+ output.collect(key, record);
+
+ }
+ } catch (IOException e) {
+ log.warn("Unable to collect output in JobLogHistoryProcessor ["
+ + recordEntry + "]", e);
+ e.printStackTrace();
+ throw e;
+ }
+
+ }
+
+ protected void extractCounters(ChukwaRecord record, String input) {
+
+ String[] data = null;
+ String[] counters = input.split(",");
+
+ for (String counter : counters) {
+ data = counter.split(":");
+ record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
+ .toUpperCase(), data[1]);
+ }
+ }
+
+ public String getDataType() {
+ return Log4jJobHistoryProcessor.recordType;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
@@ -25,7 +26,7 @@
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-public interface MapProcessor
-{
- public void process(ChukwaArchiveKey archiveKey,Chunk chunk,OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter);
+public interface MapProcessor {
+ public void process(ChukwaArchiveKey archiveKey, Chunk chunk,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java Wed Mar 11 22:39:26 2009
@@ -18,49 +18,38 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-import java.util.HashMap;
+import java.util.HashMap;
import org.apache.log4j.Logger;
+public class MapProcessorFactory {
+ static Logger log = Logger.getLogger(MapProcessorFactory.class);
+
+ private static HashMap<String, MapProcessor> processors = new HashMap<String, MapProcessor>(); // registry
+ private MapProcessorFactory() {
+ }
-public class MapProcessorFactory
-{
- static Logger log = Logger.getLogger(MapProcessorFactory.class);
-
- private static HashMap<String,MapProcessor > processors =
- new HashMap<String, MapProcessor>(); // registry
-
- private MapProcessorFactory()
- {}
-
- public static MapProcessor getProcessor(String parserClass)
- throws UnknownRecordTypeException
- {
- if (processors.containsKey(parserClass))
- {
- return processors.get(parserClass);
- }
- else
- {
- MapProcessor processor = null;
- try
- {
- processor = (MapProcessor)Class.forName(parserClass).getConstructor().newInstance();
- }
- catch(ClassNotFoundException e)
- {
- throw new UnknownRecordTypeException("Unknown parserClass:" + parserClass, e);
- }
- catch(Exception e)
- {
- throw new UnknownRecordTypeException("error constructing processor", e);
- }
-
- //TODO using a ThreadSafe/reuse flag to actually decide if we want
- // to reuse the same processor again and again
- processors.put(parserClass,processor);
- return processor;
- }
- }
+ public static MapProcessor getProcessor(String parserClass)
+ throws UnknownRecordTypeException {
+ if (processors.containsKey(parserClass)) {
+ return processors.get(parserClass);
+ } else {
+ MapProcessor processor = null;
+ try {
+ processor = (MapProcessor) Class.forName(parserClass).getConstructor()
+ .newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new UnknownRecordTypeException("Unknown parserClass:"
+ + parserClass, e);
+ } catch (Exception e) {
+ throw new UnknownRecordTypeException("error constructing processor", e);
+ }
+
+ // TODO using a ThreadSafe/reuse flag to actually decide if we want
+ // to reuse the same processor again and again
+ processors.put(parserClass, processor);
+ return processor;
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,27 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-public class PbsInvalidEntry extends Exception
-{
- /**
+public class PbsInvalidEntry extends Exception {
+
+ /**
*
*/
- private static final long serialVersionUID = 9154096600390233023L;
+ private static final long serialVersionUID = 9154096600390233023L;
- public PbsInvalidEntry()
- {}
+ public PbsInvalidEntry() {
+ }
- public PbsInvalidEntry(String message)
- {
- super(message);
- }
-
- public PbsInvalidEntry(Throwable cause)
- {
- super(cause);
- }
-
- public PbsInvalidEntry(String message, Throwable cause)
- {
- super(message, cause);
- }
+ public PbsInvalidEntry(String message) {
+ super(message);
+ }
+
+ public PbsInvalidEntry(Throwable cause) {
+ super(cause);
+ }
+
+ public PbsInvalidEntry(String message, Throwable cause) {
+ super(message, cause);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java Wed Mar 11 22:39:26 2009
@@ -18,215 +18,181 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class PbsNodes extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(PbsNodes.class);
-
- private static final String rawPBSRecordType = "PbsNodes";
- private static final String machinePBSRecordType = "MachinePbsNodes";
- private SimpleDateFormat sdf = null;
-
- public PbsNodes()
- {
- //TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- }
-
- @Override
- protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
-
-// log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-
-
-
- StringBuilder sb = new StringBuilder();
- int i = 0;
- String nodeActivityStatus = null;
- StringBuilder sbFreeMachines = new StringBuilder();
- StringBuilder sbUsedMachines = new StringBuilder();
- StringBuilder sbDownMachines = new StringBuilder();
-
- int totalFreeNode = 0;
- int totalUsedNode = 0;
- int totalDownNode = 0;
-
- String body = null;
- ChukwaRecord record = null;
-
-
- try
- {
-
- String dStr = recordEntry.substring(0, 23);
- int start = 24;
- int idx = recordEntry.indexOf(' ', start);
- //String level = recordEntry.substring(start, idx);
- start = idx+1;
- idx = recordEntry.indexOf(' ', start );
- //String className = recordEntry.substring(start, idx-1);
- body = recordEntry.substring(idx+1);
-
-
- Date d = sdf.parse(dStr );
-
- String[] lines = body.split("\n");
- while (i < lines.length)
- {
- while ((i < lines.length) && (lines[i].trim().length() > 0))
- {
- sb.append(lines[i].trim()).append("\n");
- i++;
- }
-
- if ( (i< lines.length) && (lines[i].trim().length() > 0) )
- {
- throw new PbsInvalidEntry(recordEntry);
- }
-
- // Empty line
- i++;
-
- if (sb.length() > 0)
- {
- body = sb.toString();
- // Process all entries for a machine
- //System.out.println("=========>>> Record [" + body+ "]");
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
-
- buildGenericRecord(record,null,d.getTime(),machinePBSRecordType);
- parsePbsRecord(body, record);
-
- //Output PbsNode record for 1 machine
- output.collect(key, record);
- //log.info("PbsNodeProcessor output 1 sub-record");
-
- //compute Node Activity information
- nodeActivityStatus = record.getValue("state");
- if (nodeActivityStatus != null)
- {
- if (nodeActivityStatus.equals("free"))
- {
- totalFreeNode ++;
- sbFreeMachines.append(record.getValue("Machine")).append(",");
- }
- else if (nodeActivityStatus.equals("job-exclusive"))
- {
- totalUsedNode ++;
- sbUsedMachines.append(record.getValue("Machine")).append(",");
- }
- else
- {
- totalDownNode ++;
- sbDownMachines.append(record.getValue("Machine")).append(",");
- }
- }
- sb = new StringBuilder();
- }
- }
-
- // End of parsing
-
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record,null,d.getTime(),"NodeActivity");
-
- record.setTime(d.getTime());
- record.add("used", ""+totalUsedNode);
- record.add("free", ""+totalFreeNode);
- record.add("down", ""+totalDownNode);
- record.add("usedMachines", sbUsedMachines.toString());
- record.add("freeMachines", sbFreeMachines.toString());
- record.add("downMachines", sbDownMachines.toString());
-
- output.collect(key,record);
- //log.info("PbsNodeProcessor output 1 NodeActivity");
- }
- catch (ParseException e)
- {
- e.printStackTrace();
- log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
- throw e;
- }
- catch (IOException e)
- {
- log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry + "]", e);
- e.printStackTrace();
- throw e;
- }
- catch (PbsInvalidEntry e)
- {
- log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
- e.printStackTrace();
- throw e;
- }
-
-
-
-
- }
-
- protected static void parsePbsRecord(String recordLine, ChukwaRecord record)
- {
- int i = 0;
- String[] lines = recordLine.split("\n");
- record.add("Machine", lines[0]);
-
- i++;
- String[] data = null;
- while (i < lines.length)
- {
- data = extractFields(lines[i]);
- record.add(data[0].trim(), data[1].trim());
- if (data[0].trim().equalsIgnoreCase("status"))
- {
- parseStatusField(data[1].trim(), record);
- }
- i++;
- }
- }
-
- protected static void parseStatusField(String statusField,
- ChukwaRecord record)
- {
- String[] data = null;
- String[] subFields = statusField.trim().split(",");
- for (String subflied : subFields)
- {
- data = extractFields(subflied);
- record.add("status-"+data[0].trim(), data[1].trim());
- }
- }
-
-
- static String[] extractFields(String line)
- {
- String[] args = new String[2];
- int index = line.indexOf("=");
- args[0] = line.substring(0,index );
- args[1] = line.substring(index + 1);
-
- return args;
- }
-
- public String getDataType()
- {
- return PbsNodes.rawPBSRecordType;
- }
-
+public class PbsNodes extends AbstractProcessor {
+ static Logger log = Logger.getLogger(PbsNodes.class);
+
+ private static final String rawPBSRecordType = "PbsNodes";
+ private static final String machinePBSRecordType = "MachinePbsNodes";
+ private SimpleDateFormat sdf = null;
+
+ public PbsNodes() {
+ // TODO move that to config
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+ }
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+
+ // log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" +
+ // chunk.getDataType() + "]");
+
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ String nodeActivityStatus = null;
+ StringBuilder sbFreeMachines = new StringBuilder();
+ StringBuilder sbUsedMachines = new StringBuilder();
+ StringBuilder sbDownMachines = new StringBuilder();
+
+ int totalFreeNode = 0;
+ int totalUsedNode = 0;
+ int totalDownNode = 0;
+
+ String body = null;
+ ChukwaRecord record = null;
+
+ try {
+
+ String dStr = recordEntry.substring(0, 23);
+ int start = 24;
+ int idx = recordEntry.indexOf(' ', start);
+ // String level = recordEntry.substring(start, idx);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ // String className = recordEntry.substring(start, idx-1);
+ body = recordEntry.substring(idx + 1);
+
+ Date d = sdf.parse(dStr);
+
+ String[] lines = body.split("\n");
+ while (i < lines.length) {
+ while ((i < lines.length) && (lines[i].trim().length() > 0)) {
+ sb.append(lines[i].trim()).append("\n");
+ i++;
+ }
+
+ if ((i < lines.length) && (lines[i].trim().length() > 0)) {
+ throw new PbsInvalidEntry(recordEntry);
+ }
+
+ // Empty line
+ i++;
+
+ if (sb.length() > 0) {
+ body = sb.toString();
+ // Process all entries for a machine
+ // System.out.println("=========>>> Record [" + body+ "]");
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+
+ buildGenericRecord(record, null, d.getTime(), machinePBSRecordType);
+ parsePbsRecord(body, record);
+
+ // Output PbsNode record for 1 machine
+ output.collect(key, record);
+ // log.info("PbsNodeProcessor output 1 sub-record");
+
+ // compute Node Activity information
+ nodeActivityStatus = record.getValue("state");
+ if (nodeActivityStatus != null) {
+ if (nodeActivityStatus.equals("free")) {
+ totalFreeNode++;
+ sbFreeMachines.append(record.getValue("Machine")).append(",");
+ } else if (nodeActivityStatus.equals("job-exclusive")) {
+ totalUsedNode++;
+ sbUsedMachines.append(record.getValue("Machine")).append(",");
+ } else {
+ totalDownNode++;
+ sbDownMachines.append(record.getValue("Machine")).append(",");
+ }
+ }
+ sb = new StringBuilder();
+ }
+ }
+
+ // End of parsing
+
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ buildGenericRecord(record, null, d.getTime(), "NodeActivity");
+
+ record.setTime(d.getTime());
+ record.add("used", "" + totalUsedNode);
+ record.add("free", "" + totalFreeNode);
+ record.add("down", "" + totalDownNode);
+ record.add("usedMachines", sbUsedMachines.toString());
+ record.add("freeMachines", sbFreeMachines.toString());
+ record.add("downMachines", sbDownMachines.toString());
+
+ output.collect(key, record);
+ // log.info("PbsNodeProcessor output 1 NodeActivity");
+ } catch (ParseException e) {
+ e.printStackTrace();
+ log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
+ throw e;
+ } catch (IOException e) {
+ log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry
+ + "]", e);
+ e.printStackTrace();
+ throw e;
+ } catch (PbsInvalidEntry e) {
+ log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
+ e.printStackTrace();
+ throw e;
+ }
+
+ }
+
+ protected static void parsePbsRecord(String recordLine, ChukwaRecord record) {
+ int i = 0;
+ String[] lines = recordLine.split("\n");
+ record.add("Machine", lines[0]);
+
+ i++;
+ String[] data = null;
+ while (i < lines.length) {
+ data = extractFields(lines[i]);
+ record.add(data[0].trim(), data[1].trim());
+ if (data[0].trim().equalsIgnoreCase("status")) {
+ parseStatusField(data[1].trim(), record);
+ }
+ i++;
+ }
+ }
+
+ protected static void parseStatusField(String statusField, ChukwaRecord record) {
+ String[] data = null;
+ String[] subFields = statusField.trim().split(",");
+ for (String subflied : subFields) {
+ data = extractFields(subflied);
+ record.add("status-" + data[0].trim(), data[1].trim());
+ }
+ }
+
+ static String[] extractFields(String line) {
+ String[] args = new String[2];
+ int index = line.indexOf("=");
+ args[0] = line.substring(0, index);
+ args[1] = line.substring(index + 1);
+
+ return args;
+ }
+
+ public String getDataType() {
+ return PbsNodes.rawPBSRecordType;
+ }
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java Wed Mar 11 22:39:26 2009
@@ -18,65 +18,63 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-import java.util.HashMap;
+import java.util.HashMap;
import org.apache.log4j.Logger;
+public class ProcessorFactory {
+ static Logger log = Logger.getLogger(ProcessorFactory.class);
-
-public class ProcessorFactory
-{
- static Logger log = Logger.getLogger(ProcessorFactory.class);
-
- // TODO
- // add new mapper package at the end.
- // We should have a more generic way to do this.
- // Ex: read from config
- // list of alias
- // and
- // alias -> processor class
-
-
- private static HashMap<String,ChunkProcessor > processors =
- new HashMap<String, ChunkProcessor>(); // registry
-
- private ProcessorFactory()
- {}
-
- public static ChunkProcessor getProcessor(String recordType)
- throws UnknownRecordTypeException
- {
- String path = "org.apache.hadoop.chukwa.extraction.demux.processor.mapper"+recordType;
- if (processors.containsKey(recordType)) {
- return processors.get(recordType);
- } else {
- ChunkProcessor processor = null;
- try {
- processor = (ChunkProcessor)Class.forName(path).getConstructor().newInstance();
- } catch(ClassNotFoundException e) {
- throw new UnknownRecordTypeException("Unknown recordType:" + recordType, e);
- } catch(Exception e) {
- throw new UnknownRecordTypeException("error constructing processor", e);
- }
-
- //TODO using a ThreadSafe/reuse flag to actually decide if we want
- // to reuse the same processor again and again
- register(recordType,processor);
- return processor;
- }
- }
-
- /** Register a specific parser for a {@link ChunkProcessor}
- * implementation. */
- public static synchronized void register(String recordType,
- ChunkProcessor processor)
- {
- log.info("register " + processor.getClass().getName() + " for this recordType :" + recordType);
- if (processors.containsKey(recordType))
- {
- throw new DuplicateProcessorException("Duplicate processor for recordType:" + recordType);
- }
- ProcessorFactory.processors.put(recordType, processor);
- }
+ // TODO
+ // add new mapper package at the end.
+ // We should have a more generic way to do this.
+ // Ex: read from config
+ // list of alias
+ // and
+ // alias -> processor class
+
+ private static HashMap<String, ChunkProcessor> processors = new HashMap<String, ChunkProcessor>(); // registry
+
+ private ProcessorFactory() {
+ }
+
+ public static ChunkProcessor getProcessor(String recordType)
+ throws UnknownRecordTypeException {
+ String path = "org.apache.hadoop.chukwa.extraction.demux.processor.mapper"
+ + recordType;
+ if (processors.containsKey(recordType)) {
+ return processors.get(recordType);
+ } else {
+ ChunkProcessor processor = null;
+ try {
+ processor = (ChunkProcessor) Class.forName(path).getConstructor()
+ .newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new UnknownRecordTypeException(
+ "Unknown recordType:" + recordType, e);
+ } catch (Exception e) {
+ throw new UnknownRecordTypeException("error constructing processor", e);
+ }
+
+ // TODO using a ThreadSafe/reuse flag to actually decide if we want
+ // to reuse the same processor again and again
+ register(recordType, processor);
+ return processor;
+ }
+ }
+
+ /**
+ * Register a specific parser for a {@link ChunkProcessor} implementation.
+ */
+ public static synchronized void register(String recordType,
+ ChunkProcessor processor) {
+ log.info("register " + processor.getClass().getName()
+ + " for this recordType :" + recordType);
+ if (processors.containsKey(recordType)) {
+ throw new DuplicateProcessorException(
+ "Duplicate processor for recordType:" + recordType);
+ }
+ ProcessorFactory.processors.put(recordType, processor);
+ }
}