You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [8/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -31,386 +31,375 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class JobLogHistoryProcessor extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
-
-	private static final String recordType = "JobLogHistory";
-	private static String internalRegex = null;
-	private static Pattern ip = null;
-
-	private Matcher internalMatcher = null;
-
-	public JobLogHistoryProcessor()
-	{
-		internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
-		ip = Pattern.compile(internalRegex);
-		internalMatcher = ip.matcher("-");
-	}
-
-	@Override
-	protected void parse(String recordEntry,
-			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	 throws Throwable
-	{
-
-//		log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
-//				+ chunk.getDataType() + "]");
-
-		try
-		{
-
-			HashMap<String, String> keys = new HashMap<String, String>();
-			ChukwaRecord record = null;
-		
-			int firstSep = recordEntry.indexOf(" ");
-			keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep));
-//			log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
-//					+ keys.get("RECORD_TYPE") + "]");
-
-			String body = recordEntry.substring(firstSep);
-
-			internalMatcher.reset(body);
-
-//			 String fieldName = null;
-//			 String fieldValue = null;
-
-			while (internalMatcher.matches())
-			{
-
-				keys.put(internalMatcher.group(1).trim(), internalMatcher
-						.group(2).trim());
-
-				// TODO Remove debug info before production
-//				 fieldName = internalMatcher.group(1).trim();
-//				 fieldValue = internalMatcher.group(2).trim();
-//				 log.info("JobLogHistoryProcessor Add field: [" + fieldName +
-//				 "][" + fieldValue +"]" );
-//				 log.info("EOL : [" + internalMatcher.group(3) + "]" );
-				internalMatcher.reset(internalMatcher.group(3));
-			} 
-
-			if (!keys.containsKey("JOBID"))
-			{
-				// Extract JobID from taskID
-				// JOBID  = "job_200804210403_0005"
-				// TASKID = "tip_200804210403_0005_m_000018"
-				String jobId = keys.get("TASKID");
-				int idx1 = jobId.indexOf('_',0);
-				int idx2 = jobId.indexOf('_', idx1+1);
-				idx2 = jobId.indexOf('_', idx2+1);
-				keys.put("JOBID",jobId.substring(idx1+1,idx2));
-//				log.info("JobLogHistoryProcessor Add field: [JOBID]["
-//						+ keys.get("JOBID") + "]");
-			}
-			else
-			{
-				String jobId = keys.get("JOBID").replace("_", "").substring(3);
-				keys.put("JOBID",jobId);
-			}
-			// if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
-			// keys.containsKey("SUBMIT_TIME"))
-			// {
-			// // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
-			// USER="userxxx"
-			// // SUBMIT_TIME="1208760436751"
-			// JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
-			//					
-			//					
-			// }
-			// else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
-			// keys.containsKey("LAUNCH_TIME"))
-			// {
-			// // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
-			// TOTAL_MAPS="5912" TOTAL_REDUCES="739"
-			//					
-			// }
-			// else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
-			// keys.containsKey("FINISH_TIME"))
-			// {
-			// // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
-			// JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
-			// FAILED_MAPS="0" FAILED_REDUCES="0"
-			// // COUNTERS="File Systems.Local bytes read:1735053407244,File
-			// Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
-			// read:801605644910,File Systems.HDFS bytes written:44135800,
-			// // Job Counters .Launched map tasks:5912,Job Counters .Launched
-			// reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
-			// Counters .Rack-local map tasks:316,Map-Reduce Framework.
-			// // Map input records:9410696067,Map-Reduce Framework.Map output
-			// records:9410696067,Map-Reduce Framework.Map input
-			// bytes:801599188816,Map-Reduce Framework.Map output
-			// bytes:784427968116,
-			// // Map-Reduce Framework.Combine input records:0,Map-Reduce
-			// Framework.Combine output records:0,Map-Reduce Framework.Reduce
-			// input groups:477265,Map-Reduce Framework.Reduce input
-			// records:739000,
-			// // Map-Reduce Framework.Reduce output records:739000"
-			//					
-			// }
-			// else
-			if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
-					&& keys.containsKey("START_TIME"))
-			{
-				// MapAttempt TASK_TYPE="MAP"
-				// TASKID="tip_200804210403_0005_m_000018"
-				// TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
-				// START_TIME="1208760437531"
-				// HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("START_TIME")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("START_TIME", keys.get("START_TIME"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/Map/S");
-				output.collect(key, record);
-
-			} else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
-					&& keys.containsKey("FINISH_TIME"))
-			{
-				// MapAttempt TASK_TYPE="MAP"
-				// TASKID="tip_200804210403_0005_m_005494"
-				// TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
-				// TASK_STATUS="SUCCESS"
-				// FINISH_TIME="1208760624124"
-				// HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("FINISH_TIME", keys.get("FINISH_TIME"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/Map/E");
-				output.collect(key, record);
-			}
-
-			else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
-					&& keys.containsKey("START_TIME"))
-			{
-				// ReduceAttempt TASK_TYPE="REDUCE"
-				// TASKID="tip_200804210403_0005_r_000138"
-				// TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
-				// START_TIME="1208760454885"
-				// HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("START_TIME")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("START_TIME", keys.get("START_TIME"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/SHUFFLE/S");
-				output.collect(key, record);
-
-			} else if (keys.get("RECORD_TYPE")
-					.equalsIgnoreCase("ReduceAttempt")
-					&& keys.containsKey("FINISH_TIME"))
-			{
-				// ReduceAttempt TASK_TYPE="REDUCE"
-				// TASKID="tip_200804210403_0005_r_000138"
-				// TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
-				// TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
-				// SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
-				// HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/SHUFFLE/E");
-				output.collect(key, record);
-
-				// SORT
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/SORT/S");
-				output.collect(key, record);
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/SORT/E");
-				output.collect(key, record);
-
-				// Reduce
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("START_TIME", keys.get("SORT_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/REDUCE/S");
-				output.collect(key, record);
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/REDUCE/E");
-				output.collect(key, record);
-
-			} else if ( keys.get("RECORD_TYPE").equalsIgnoreCase("Job") )
-			{
-				// 1
-				// Job JOBID="job_200809062051_0001" JOBNAME="wordcount" USER="xxx" 
-				// SUBMIT_TIME="1208760906812" 
-				// JOBCONF="/user/xxx/mapredsystem/563976.yyy.zzz.com/job_200809062051_0001/job.xml"
-				
-				// 2
-				// Job JOBID="job_200809062051_0001" LAUNCH_TIME="1208760906816" TOTAL_MAPS="3" TOTAL_REDUCES="7"
-				
-				// 3
-				// Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906826"
-				// JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
-				// FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
-				// COUNTERS="File Systems.Local bytes read:1735053407244,File
-				// Systems.Local bytes written:2610106384012,File Systems.HDFS
-				// bytes read:801605644910,File Systems.HDFS bytes
-				// written:44135800,
-				// Job Counters .Launched map tasks:5912,Job Counters .Launched
-				// reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
-				// Counters .Rack-local map tasks:316,Map-Reduce Framework.
-				// Map input records:9410696067,Map-Reduce Framework.Map output
-				// records:9410696067,Map-Reduce Framework.Map input
-				// bytes:801599188816,Map-Reduce Framework.Map output
-				// bytes:784427968116,
-				// Map-Reduce Framework.Combine input records:0,Map-Reduce
-				// Framework.Combine output records:0,Map-Reduce
-				// Framework.Reduce input groups:477265,Map-Reduce
-				// Framework.Reduce input records:739000,
-				// Map-Reduce Framework.Reduce output records:739000"
-
-				record = new ChukwaRecord();
-				key = new ChukwaRecordKey();
-				buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "MRJob");
-				if (keys.containsKey("COUNTERS"))
-				{
-					extractCounters(record, keys.get("COUNTERS"));
-				}
-
-				key = new ChukwaRecordKey();
-				key.setKey("MRJob/" + keys.get("JOBID") );
-				key.setReduceType("MRJobReduceProcessor");
-				
-				record = new ChukwaRecord();
-				record.add(Record.tagsField, chunk.getTags());
-				if (keys.containsKey("SUBMIT_TIME"))
-				{ record.setTime(Long.parseLong(keys.get("SUBMIT_TIME"))); }
-				else if (keys.containsKey("LAUNCH_TIME"))
-				{ record.setTime(Long.parseLong(keys.get("LAUNCH_TIME"))); }
-				else if (keys.containsKey("FINISH_TIME"))
-				{ record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); }
-				
-				Iterator<String> it = keys.keySet().iterator();
-				while(it.hasNext())
-				{
-					String field = it.next();
-			        record.add(field, keys.get(field));
-				}
-				 
-				 
-				output.collect(key, record);
-			}
-
-			if (keys.containsKey("TASK_TYPE")
-					&& keys.containsKey("COUNTERS")
-					&& (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys
-							.get("TASK_TYPE").equalsIgnoreCase("MAP")))
-			{
-				// MAP
-				// Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
-				// TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
-				// COUNTERS="File Systems.Local bytes read:159265655,File
-				// Systems.Local bytes written:318531310,
-				// File Systems.HDFS bytes read:145882417,Map-Reduce
-				// Framework.Map input records:1706604,
-				// Map-Reduce Framework.Map output records:1706604,Map-Reduce
-				// Framework.Map input bytes:145882057,
-				// Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
-				// Framework.Combine input records:0,Map-Reduce
-				// Framework.Combine output records:0"
-
-				// REDUCE
-				// Task TASKID="tip_200804210403_0005_r_000524"
-				// TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
-				// FINISH_TIME="1208760877072"
-				// COUNTERS="File Systems.Local bytes read:1179319677,File
-				// Systems.Local bytes written:1184474889,File Systems.HDFS
-				// bytes written:59021,
-				// Map-Reduce Framework.Reduce input groups:684,Map-Reduce
-				// Framework.Reduce input records:1000,Map-Reduce
-				// Framework.Reduce output records:1000"
-
-				record = new ChukwaRecord();
-				key = new ChukwaRecordKey();
-				buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
-				extractCounters(record, keys.get("COUNTERS"));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("TASKID", keys.get("TASKID"));
-				record.add("TASK_TYPE", keys.get("TASK_TYPE"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("MR_Graph +1");
-				output.collect(key, record);
-
-			}
-		} 
-		catch (IOException e)
-		{
-			log.warn("Unable to collect output in JobLogHistoryProcessor ["
-					+ recordEntry + "]", e);
-			e.printStackTrace();
-			throw e;
-		}
-
-	}
-
-	protected void extractCounters(ChukwaRecord record, String input)
-	{
-
-		String[] data = null;
-		String[] counters = input.split(",");
-
-		for (String counter : counters)
-		{
-			data = counter.split(":");
-			record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
-					.toUpperCase(), data[1]);
-		}
-	}
-
-	public String getDataType()
-	{
-		return JobLogHistoryProcessor.recordType;
-	}
+public class JobLogHistoryProcessor extends AbstractProcessor {
+  static Logger log = Logger.getLogger(JobLogHistoryProcessor.class);
+
+  private static final String recordType = "JobLogHistory";
+  private static String internalRegex = null;
+  private static Pattern ip = null;
+
+  private Matcher internalMatcher = null;
+
+  public JobLogHistoryProcessor() {
+    internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
+    ip = Pattern.compile(internalRegex);
+    internalMatcher = ip.matcher("-");
+  }
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+
+    // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
+    // + chunk.getDataType() + "]");
+
+    try {
+
+      HashMap<String, String> keys = new HashMap<String, String>();
+      ChukwaRecord record = null;
+
+      int firstSep = recordEntry.indexOf(" ");
+      keys.put("RECORD_TYPE", recordEntry.substring(0, firstSep));
+      // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
+      // + keys.get("RECORD_TYPE") + "]");
+
+      String body = recordEntry.substring(firstSep);
+
+      internalMatcher.reset(body);
+
+      // String fieldName = null;
+      // String fieldValue = null;
+
+      while (internalMatcher.matches()) {
+
+        keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
+            .trim());
+
+        // TODO Remove debug info before production
+        // fieldName = internalMatcher.group(1).trim();
+        // fieldValue = internalMatcher.group(2).trim();
+        // log.info("JobLogHistoryProcessor Add field: [" + fieldName +
+        // "][" + fieldValue +"]" );
+        // log.info("EOL : [" + internalMatcher.group(3) + "]" );
+        internalMatcher.reset(internalMatcher.group(3));
+      }
+
+      if (!keys.containsKey("JOBID")) {
+        // Extract JobID from taskID
+        // JOBID = "job_200804210403_0005"
+        // TASKID = "tip_200804210403_0005_m_000018"
+        String jobId = keys.get("TASKID");
+        int idx1 = jobId.indexOf('_', 0);
+        int idx2 = jobId.indexOf('_', idx1 + 1);
+        idx2 = jobId.indexOf('_', idx2 + 1);
+        keys.put("JOBID", jobId.substring(idx1 + 1, idx2));
+        // log.info("JobLogHistoryProcessor Add field: [JOBID]["
+        // + keys.get("JOBID") + "]");
+      } else {
+        String jobId = keys.get("JOBID").replace("_", "").substring(3);
+        keys.put("JOBID", jobId);
+      }
+      // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+      // keys.containsKey("SUBMIT_TIME"))
+      // {
+      // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
+      // USER="userxxx"
+      // // SUBMIT_TIME="1208760436751"
+      // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
+      //					
+      //					
+      // }
+      // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+      // keys.containsKey("LAUNCH_TIME"))
+      // {
+      // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
+      // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
+      //					
+      // }
+      // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+      // keys.containsKey("FINISH_TIME"))
+      // {
+      // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
+      // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
+      // FAILED_MAPS="0" FAILED_REDUCES="0"
+      // // COUNTERS="File Systems.Local bytes read:1735053407244,File
+      // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
+      // read:801605644910,File Systems.HDFS bytes written:44135800,
+      // // Job Counters .Launched map tasks:5912,Job Counters .Launched
+      // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+      // Counters .Rack-local map tasks:316,Map-Reduce Framework.
+      // // Map input records:9410696067,Map-Reduce Framework.Map output
+      // records:9410696067,Map-Reduce Framework.Map input
+      // bytes:801599188816,Map-Reduce Framework.Map output
+      // bytes:784427968116,
+      // // Map-Reduce Framework.Combine input records:0,Map-Reduce
+      // Framework.Combine output records:0,Map-Reduce Framework.Reduce
+      // input groups:477265,Map-Reduce Framework.Reduce input
+      // records:739000,
+      // // Map-Reduce Framework.Reduce output records:739000"
+      //					
+      // }
+      // else
+      if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+          && keys.containsKey("START_TIME")) {
+        // MapAttempt TASK_TYPE="MAP"
+        // TASKID="tip_200804210403_0005_m_000018"
+        // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
+        // START_TIME="1208760437531"
+        // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
+            + keys.get("START_TIME"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("START_TIME")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("START_TIME", keys.get("START_TIME"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/Map/S");
+        output.collect(key, record);
+
+      } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+          && keys.containsKey("FINISH_TIME")) {
+        // MapAttempt TASK_TYPE="MAP"
+        // TASKID="tip_200804210403_0005_m_005494"
+        // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
+        // TASK_STATUS="SUCCESS"
+        // FINISH_TIME="1208760624124"
+        // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
+            + keys.get("FINISH_TIME"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("FINISH_TIME", keys.get("FINISH_TIME"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/Map/E");
+        output.collect(key, record);
+      }
+
+      else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+          && keys.containsKey("START_TIME")) {
+        // ReduceAttempt TASK_TYPE="REDUCE"
+        // TASKID="tip_200804210403_0005_r_000138"
+        // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+        // START_TIME="1208760454885"
+        // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
+            + keys.get("START_TIME"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("START_TIME")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("START_TIME", keys.get("START_TIME"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/SHUFFLE/S");
+        output.collect(key, record);
+
+      } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+          && keys.containsKey("FINISH_TIME")) {
+        // ReduceAttempt TASK_TYPE="REDUCE"
+        // TASKID="tip_200804210403_0005_r_000138"
+        // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+        // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
+        // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
+        // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
+            + keys.get("SHUFFLE_FINISHED"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/SHUFFLE/E");
+        output.collect(key, record);
+
+        // SORT
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
+            + keys.get("SHUFFLE_FINISHED"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/SORT/S");
+        output.collect(key, record);
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
+            + keys.get("SORT_FINISHED"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/SORT/E");
+        output.collect(key, record);
+
+        // Reduce
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
+            + keys.get("SORT_FINISHED"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("START_TIME", keys.get("SORT_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/REDUCE/S");
+        output.collect(key, record);
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
+            + keys.get("FINISH_TIME"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/REDUCE/E");
+        output.collect(key, record);
+
+      } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")) {
+        // 1
+        // Job JOBID="job_200809062051_0001" JOBNAME="wordcount" USER="xxx"
+        // SUBMIT_TIME="1208760906812"
+        // JOBCONF=
+        // "/user/xxx/mapredsystem/563976.yyy.zzz.com/job_200809062051_0001/job.xml"
+
+        // 2
+        // Job JOBID="job_200809062051_0001" LAUNCH_TIME="1208760906816"
+        // TOTAL_MAPS="3" TOTAL_REDUCES="7"
+
+        // 3
+        // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906826"
+        // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
+        // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
+        // COUNTERS="File Systems.Local bytes read:1735053407244,File
+        // Systems.Local bytes written:2610106384012,File Systems.HDFS
+        // bytes read:801605644910,File Systems.HDFS bytes
+        // written:44135800,
+        // Job Counters .Launched map tasks:5912,Job Counters .Launched
+        // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+        // Counters .Rack-local map tasks:316,Map-Reduce Framework.
+        // Map input records:9410696067,Map-Reduce Framework.Map output
+        // records:9410696067,Map-Reduce Framework.Map input
+        // bytes:801599188816,Map-Reduce Framework.Map output
+        // bytes:784427968116,
+        // Map-Reduce Framework.Combine input records:0,Map-Reduce
+        // Framework.Combine output records:0,Map-Reduce
+        // Framework.Reduce input groups:477265,Map-Reduce
+        // Framework.Reduce input records:739000,
+        // Map-Reduce Framework.Reduce output records:739000"
+
+        record = new ChukwaRecord();
+        key = new ChukwaRecordKey();
+        buildGenericRecord(record, null, Long
+            .parseLong(keys.get("FINISH_TIME")), "MRJob");
+        if (keys.containsKey("COUNTERS")) {
+          extractCounters(record, keys.get("COUNTERS"));
+        }
+
+        key = new ChukwaRecordKey();
+        key.setKey("MRJob/" + keys.get("JOBID"));
+        key.setReduceType("MRJobReduceProcessor");
+
+        record = new ChukwaRecord();
+        record.add(Record.tagsField, chunk.getTags());
+        if (keys.containsKey("SUBMIT_TIME")) {
+          record.setTime(Long.parseLong(keys.get("SUBMIT_TIME")));
+        } else if (keys.containsKey("LAUNCH_TIME")) {
+          record.setTime(Long.parseLong(keys.get("LAUNCH_TIME")));
+        } else if (keys.containsKey("FINISH_TIME")) {
+          record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
+        }
+
+        Iterator<String> it = keys.keySet().iterator();
+        while (it.hasNext()) {
+          String field = it.next();
+          record.add(field, keys.get(field));
+        }
+
+        output.collect(key, record);
+      }
+
+      if (keys.containsKey("TASK_TYPE")
+          && keys.containsKey("COUNTERS")
+          && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
+              "TASK_TYPE").equalsIgnoreCase("MAP"))) {
+        // MAP
+        // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
+        // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
+        // COUNTERS="File Systems.Local bytes read:159265655,File
+        // Systems.Local bytes written:318531310,
+        // File Systems.HDFS bytes read:145882417,Map-Reduce
+        // Framework.Map input records:1706604,
+        // Map-Reduce Framework.Map output records:1706604,Map-Reduce
+        // Framework.Map input bytes:145882057,
+        // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
+        // Framework.Combine input records:0,Map-Reduce
+        // Framework.Combine output records:0"
+
+        // REDUCE
+        // Task TASKID="tip_200804210403_0005_r_000524"
+        // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
+        // FINISH_TIME="1208760877072"
+        // COUNTERS="File Systems.Local bytes read:1179319677,File
+        // Systems.Local bytes written:1184474889,File Systems.HDFS
+        // bytes written:59021,
+        // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
+        // Framework.Reduce input records:1000,Map-Reduce
+        // Framework.Reduce output records:1000"
+
+        record = new ChukwaRecord();
+        key = new ChukwaRecordKey();
+        buildGenericRecord(record, null, Long
+            .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
+        extractCounters(record, keys.get("COUNTERS"));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("TASKID", keys.get("TASKID"));
+        record.add("TASK_TYPE", keys.get("TASK_TYPE"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("MR_Graph +1");
+        output.collect(key, record);
+
+      }
+    } catch (IOException e) {
+      log.warn("Unable to collect output in JobLogHistoryProcessor ["
+          + recordEntry + "]", e);
+      e.printStackTrace();
+      throw e;
+    }
+
+  }
+
+  protected void extractCounters(ChukwaRecord record, String input) {
+
+    String[] data = null;
+    String[] counters = input.split(",");
+
+    for (String counter : counters) {
+      data = counter.split(":");
+      record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
+          .toUpperCase(), data[1]);
+    }
+  }
+
+  public String getDataType() {
+    return JobLogHistoryProcessor.recordType;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,11 +18,11 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.util.Hashtable;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
@@ -30,367 +30,359 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class Log4jJobHistoryProcessor extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
-
-	private static final String recordType = "JobLogHistory";
-	private static String internalRegex = null;
-	private static Pattern ip = null;
-
-	private Matcher internalMatcher = null;
-
-	public Log4jJobHistoryProcessor()
-	{
-		internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
-		ip = Pattern.compile(internalRegex);
-		internalMatcher = ip.matcher("-");
-	}
-
-	@Override
-	protected void parse(String recordEntry,
-			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	 throws Throwable
-	{
-
-//		log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
-//				+ chunk.getDataType() + "]");
-
-		try
-		{
-
-			//String dStr = recordEntry.substring(0, 23);
-			int start = 24;
-			int idx = recordEntry.indexOf(' ', start);
-			// String level = recordEntry.substring(start, idx);
-			start = idx + 1;
-			idx = recordEntry.indexOf(' ', start);
-			// String className = recordEntry.substring(start, idx-1);
-			String body = recordEntry.substring(idx + 1);
-
-			Hashtable<String, String> keys = new Hashtable<String, String>();
-			ChukwaRecord record = null;
-		
-			int firstSep = body.indexOf(" ");
-			keys.put("RECORD_TYPE", body.substring(0, firstSep));
-//			log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
-//					+ keys.get("RECORD_TYPE") + "]");
-
-			body = body.substring(firstSep);
-
-			internalMatcher.reset(body);
-
-//			 String fieldName = null;
-//			 String fieldValue = null;
-
-			while (internalMatcher.matches())
-			{
-
-				keys.put(internalMatcher.group(1).trim(), internalMatcher
-						.group(2).trim());
-
-				// TODO Remove debug info before production
-//				 fieldName = internalMatcher.group(1).trim();
-//				 fieldValue = internalMatcher.group(2).trim();
-//				 log.info("JobLogHistoryProcessor Add field: [" + fieldName +
-//				 "][" + fieldValue +"]" );
-//				 log.info("EOL : [" + internalMatcher.group(3) + "]" );
-				internalMatcher.reset(internalMatcher.group(3));
-			} 
-
-			if (!keys.containsKey("JOBID"))
-			{
-				// Extract JobID from taskID
-				// JOBID  = "job_200804210403_0005"
-				// TASKID = "tip_200804210403_0005_m_000018"
-				String jobId = keys.get("TASKID");
-				int idx1 = jobId.indexOf('_',0);
-				int idx2 = jobId.indexOf('_', idx1+1);
-				idx2 = jobId.indexOf('_', idx2+1);
-				keys.put("JOBID","job" + jobId.substring(idx1,idx2));
-//				log.info("JobLogHistoryProcessor Add field: [JOBID]["
-//						+ keys.get("JOBID") + "]");
-			}
-			
-			// if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
-			// keys.containsKey("SUBMIT_TIME"))
-			// {
-			// // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
-			// USER="userxxx"
-			// // SUBMIT_TIME="1208760436751"
-			// JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
-			//					
-			//					
-			// }
-			// else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
-			// keys.containsKey("LAUNCH_TIME"))
-			// {
-			// // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
-			// TOTAL_MAPS="5912" TOTAL_REDUCES="739"
-			//					
-			// }
-			// else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
-			// keys.containsKey("FINISH_TIME"))
-			// {
-			// // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
-			// JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
-			// FAILED_MAPS="0" FAILED_REDUCES="0"
-			// // COUNTERS="File Systems.Local bytes read:1735053407244,File
-			// Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
-			// read:801605644910,File Systems.HDFS bytes written:44135800,
-			// // Job Counters .Launched map tasks:5912,Job Counters .Launched
-			// reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
-			// Counters .Rack-local map tasks:316,Map-Reduce Framework.
-			// // Map input records:9410696067,Map-Reduce Framework.Map output
-			// records:9410696067,Map-Reduce Framework.Map input
-			// bytes:801599188816,Map-Reduce Framework.Map output
-			// bytes:784427968116,
-			// // Map-Reduce Framework.Combine input records:0,Map-Reduce
-			// Framework.Combine output records:0,Map-Reduce Framework.Reduce
-			// input groups:477265,Map-Reduce Framework.Reduce input
-			// records:739000,
-			// // Map-Reduce Framework.Reduce output records:739000"
-			//					
-			// }
-			// else
-			if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
-					&& keys.containsKey("START_TIME"))
-			{
-				// MapAttempt TASK_TYPE="MAP"
-				// TASKID="tip_200804210403_0005_m_000018"
-				// TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
-				// START_TIME="1208760437531"
-				// HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("START_TIME")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("START_TIME", keys.get("START_TIME"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/Map/S");
-				output.collect(key, record);
-
-			} else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
-					&& keys.containsKey("FINISH_TIME"))
-			{
-				// MapAttempt TASK_TYPE="MAP"
-				// TASKID="tip_200804210403_0005_m_005494"
-				// TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
-				// TASK_STATUS="SUCCESS"
-				// FINISH_TIME="1208760624124"
-				// HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("FINISH_TIME", keys.get("FINISH_TIME"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/Map/E");
-				output.collect(key, record);
-			}
-
-			else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
-					&& keys.containsKey("START_TIME"))
-			{
-				// ReduceAttempt TASK_TYPE="REDUCE"
-				// TASKID="tip_200804210403_0005_r_000138"
-				// TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
-				// START_TIME="1208760454885"
-				// HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("START_TIME"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("START_TIME")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("START_TIME", keys.get("START_TIME"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/SHUFFLE/S");
-				output.collect(key, record);
-
-			} else if (keys.get("RECORD_TYPE")
-					.equalsIgnoreCase("ReduceAttempt")
-					&& keys.containsKey("FINISH_TIME"))
-			{
-				// ReduceAttempt TASK_TYPE="REDUCE"
-				// TASKID="tip_200804210403_0005_r_000138"
-				// TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
-				// TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
-				// SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
-				// HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/SHUFFLE/E");
-				output.collect(key, record);
-
-				// SORT
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SHUFFLE_FINISHED"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/SORT/S");
-				output.collect(key, record);
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/SORT/E");
-				output.collect(key, record);
-
-				// Reduce
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("SORT_FINISHED"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("START_TIME", keys.get("SORT_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/REDUCE/S");
-				output.collect(key, record);
-
-				key = new ChukwaRecordKey();
-				key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/" + keys.get("FINISH_TIME"));
-				key.setReduceType("JobLogHistoryReduceProcessor");
-				record = new ChukwaRecord();
-				record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
-				record.add(Record.tagsField, chunk.getTags());
-//				log.info("JobLogHist/REDUCE/E");
-				output.collect(key, record);
-
-			} else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
-					&& keys.containsKey("COUNTERS"))
-			{
-				// Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
-				// JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
-				// FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
-				// COUNTERS="File Systems.Local bytes read:1735053407244,File
-				// Systems.Local bytes written:2610106384012,File Systems.HDFS
-				// bytes read:801605644910,File Systems.HDFS bytes
-				// written:44135800,
-				// Job Counters .Launched map tasks:5912,Job Counters .Launched
-				// reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
-				// Counters .Rack-local map tasks:316,Map-Reduce Framework.
-				// Map input records:9410696067,Map-Reduce Framework.Map output
-				// records:9410696067,Map-Reduce Framework.Map input
-				// bytes:801599188816,Map-Reduce Framework.Map output
-				// bytes:784427968116,
-				// Map-Reduce Framework.Combine input records:0,Map-Reduce
-				// Framework.Combine output records:0,Map-Reduce
-				// Framework.Reduce input groups:477265,Map-Reduce
-				// Framework.Reduce input records:739000,
-				// Map-Reduce Framework.Reduce output records:739000"
-
-				record = new ChukwaRecord();
-				key = new ChukwaRecordKey();
-				buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
-				extractCounters(record, keys.get("COUNTERS"));
-
-				String jobId = keys.get("JOBID").replace("_", "").substring(3);
-				record.add("JobId", "" + jobId);
-
-				// FIXME validate this when HodId will be available
-				if (keys.containsKey("HODID"))
-					{ record.add("HodId", keys.get("HODID")); }
-
-//				log.info("MRJobCounters +1");
-				output.collect(key, record);
-			}
-
-			if (keys.containsKey("TASK_TYPE")
-					&& keys.containsKey("COUNTERS")
-					&& (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys
-							.get("TASK_TYPE").equalsIgnoreCase("MAP")))
-			{
-				// MAP
-				// Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
-				// TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
-				// COUNTERS="File Systems.Local bytes read:159265655,File
-				// Systems.Local bytes written:318531310,
-				// File Systems.HDFS bytes read:145882417,Map-Reduce
-				// Framework.Map input records:1706604,
-				// Map-Reduce Framework.Map output records:1706604,Map-Reduce
-				// Framework.Map input bytes:145882057,
-				// Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
-				// Framework.Combine input records:0,Map-Reduce
-				// Framework.Combine output records:0"
-
-				// REDUCE
-				// Task TASKID="tip_200804210403_0005_r_000524"
-				// TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
-				// FINISH_TIME="1208760877072"
-				// COUNTERS="File Systems.Local bytes read:1179319677,File
-				// Systems.Local bytes written:1184474889,File Systems.HDFS
-				// bytes written:59021,
-				// Map-Reduce Framework.Reduce input groups:684,Map-Reduce
-				// Framework.Reduce input records:1000,Map-Reduce
-				// Framework.Reduce output records:1000"
-
-				record = new ChukwaRecord();
-				key = new ChukwaRecordKey();
-				buildGenericRecord(record, null, Long.parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
-				extractCounters(record, keys.get("COUNTERS"));
-				record.add("JOBID",keys.get("JOBID"));
-				record.add("TASKID", keys.get("TASKID"));
-				record.add("TASK_TYPE", keys.get("TASK_TYPE"));
-				
-//				log.info("MR_Graph +1");
-				output.collect(key, record);
-
-			}
-		} 
-		catch (IOException e)
-		{
-			log.warn("Unable to collect output in JobLogHistoryProcessor ["
-					+ recordEntry + "]", e);
-			e.printStackTrace();
-			throw e;
-		}
-
-	}
-
-	protected void extractCounters(ChukwaRecord record, String input)
-	{
-
-		String[] data = null;
-		String[] counters = input.split(",");
-
-		for (String counter : counters)
-		{
-			data = counter.split(":");
-			record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
-					.toUpperCase(), data[1]);
-		}
-	}
-
-	public String getDataType()
-	{
-		return Log4jJobHistoryProcessor.recordType;
-	}
+public class Log4jJobHistoryProcessor extends AbstractProcessor {
+  static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class);
+
+  private static final String recordType = "JobLogHistory";
+  private static String internalRegex = null;
+  private static Pattern ip = null;
+
+  private Matcher internalMatcher = null;
+
+  public Log4jJobHistoryProcessor() {
+    internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?";
+    ip = Pattern.compile(internalRegex);
+    internalMatcher = ip.matcher("-");
+  }
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+
+    // log.info("JobLogHistoryProcessor record: [" + recordEntry + "] type["
+    // + chunk.getDataType() + "]");
+
+    try {
+
+      // String dStr = recordEntry.substring(0, 23);
+      int start = 24;
+      int idx = recordEntry.indexOf(' ', start);
+      // String level = recordEntry.substring(start, idx);
+      start = idx + 1;
+      idx = recordEntry.indexOf(' ', start);
+      // String className = recordEntry.substring(start, idx-1);
+      String body = recordEntry.substring(idx + 1);
+
+      Hashtable<String, String> keys = new Hashtable<String, String>();
+      ChukwaRecord record = null;
+
+      int firstSep = body.indexOf(" ");
+      keys.put("RECORD_TYPE", body.substring(0, firstSep));
+      // log.info("JobLogHistoryProcessor Add field: [RECORD_TYPE]["
+      // + keys.get("RECORD_TYPE") + "]");
+
+      body = body.substring(firstSep);
+
+      internalMatcher.reset(body);
+
+      // String fieldName = null;
+      // String fieldValue = null;
+
+      while (internalMatcher.matches()) {
+
+        keys.put(internalMatcher.group(1).trim(), internalMatcher.group(2)
+            .trim());
+
+        // TODO Remove debug info before production
+        // fieldName = internalMatcher.group(1).trim();
+        // fieldValue = internalMatcher.group(2).trim();
+        // log.info("JobLogHistoryProcessor Add field: [" + fieldName +
+        // "][" + fieldValue +"]" );
+        // log.info("EOL : [" + internalMatcher.group(3) + "]" );
+        internalMatcher.reset(internalMatcher.group(3));
+      }
+
+      if (!keys.containsKey("JOBID")) {
+        // Extract JobID from taskID
+        // JOBID = "job_200804210403_0005"
+        // TASKID = "tip_200804210403_0005_m_000018"
+        String jobId = keys.get("TASKID");
+        int idx1 = jobId.indexOf('_', 0);
+        int idx2 = jobId.indexOf('_', idx1 + 1);
+        idx2 = jobId.indexOf('_', idx2 + 1);
+        keys.put("JOBID", "job" + jobId.substring(idx1, idx2));
+        // log.info("JobLogHistoryProcessor Add field: [JOBID]["
+        // + keys.get("JOBID") + "]");
+      }
+
+      // if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+      // keys.containsKey("SUBMIT_TIME"))
+      // {
+      // // Job JOBID="job_200804210403_0005" JOBNAME="MY_JOB"
+      // USER="userxxx"
+      // // SUBMIT_TIME="1208760436751"
+      // JOBCONF="/mapredsystem/xxx.yyy.com/job_200804210403_0005/job.xml"
+      //					
+      //					
+      // }
+      // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+      // keys.containsKey("LAUNCH_TIME"))
+      // {
+      // // Job JOBID="job_200804210403_0005" LAUNCH_TIME="1208760437110"
+      // TOTAL_MAPS="5912" TOTAL_REDUCES="739"
+      //					
+      // }
+      // else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job") &&
+      // keys.containsKey("FINISH_TIME"))
+      // {
+      // // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
+      // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912" FINISHED_REDUCES="739"
+      // FAILED_MAPS="0" FAILED_REDUCES="0"
+      // // COUNTERS="File Systems.Local bytes read:1735053407244,File
+      // Systems.Local bytes written:2610106384012,File Systems.HDFS bytes
+      // read:801605644910,File Systems.HDFS bytes written:44135800,
+      // // Job Counters .Launched map tasks:5912,Job Counters .Launched
+      // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+      // Counters .Rack-local map tasks:316,Map-Reduce Framework.
+      // // Map input records:9410696067,Map-Reduce Framework.Map output
+      // records:9410696067,Map-Reduce Framework.Map input
+      // bytes:801599188816,Map-Reduce Framework.Map output
+      // bytes:784427968116,
+      // // Map-Reduce Framework.Combine input records:0,Map-Reduce
+      // Framework.Combine output records:0,Map-Reduce Framework.Reduce
+      // input groups:477265,Map-Reduce Framework.Reduce input
+      // records:739000,
+      // // Map-Reduce Framework.Reduce output records:739000"
+      //					
+      // }
+      // else
+      if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+          && keys.containsKey("START_TIME")) {
+        // MapAttempt TASK_TYPE="MAP"
+        // TASKID="tip_200804210403_0005_m_000018"
+        // TASK_ATTEMPT_ID="task_200804210403_0005_m_000018_0"
+        // START_TIME="1208760437531"
+        // HOSTNAME="tracker_xxx.yyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:53734"
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
+            + keys.get("START_TIME"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("START_TIME")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("START_TIME", keys.get("START_TIME"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/Map/S");
+        output.collect(key, record);
+
+      } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("MapAttempt")
+          && keys.containsKey("FINISH_TIME")) {
+        // MapAttempt TASK_TYPE="MAP"
+        // TASKID="tip_200804210403_0005_m_005494"
+        // TASK_ATTEMPT_ID="task_200804210403_0005_m_005494_0"
+        // TASK_STATUS="SUCCESS"
+        // FINISH_TIME="1208760624124"
+        // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:55491"
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/Map/" + keys.get("JOBID") + "/"
+            + keys.get("FINISH_TIME"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("FINISH_TIME")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("FINISH_TIME", keys.get("FINISH_TIME"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/Map/E");
+        output.collect(key, record);
+      }
+
+      else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+          && keys.containsKey("START_TIME")) {
+        // ReduceAttempt TASK_TYPE="REDUCE"
+        // TASKID="tip_200804210403_0005_r_000138"
+        // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+        // START_TIME="1208760454885"
+        // HOSTNAME="tracker_xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
+            + keys.get("START_TIME"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("START_TIME")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("START_TIME", keys.get("START_TIME"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/SHUFFLE/S");
+        output.collect(key, record);
+
+      } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("ReduceAttempt")
+          && keys.containsKey("FINISH_TIME")) {
+        // ReduceAttempt TASK_TYPE="REDUCE"
+        // TASKID="tip_200804210403_0005_r_000138"
+        // TASK_ATTEMPT_ID="task_200804210403_0005_r_000138_0"
+        // TASK_STATUS="SUCCESS" SHUFFLE_FINISHED="1208760787167"
+        // SORT_FINISHED="1208760787354" FINISH_TIME="1208760802395"
+        // HOSTNAME="tracker__xxxx.yyyy.com:xxx.yyy.com/xxx.xxx.xxx.xxx:51947"
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/SHUFFLE/" + keys.get("JOBID") + "/"
+            + keys.get("SHUFFLE_FINISHED"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("SHUFFLE_FINISHED", keys.get("SHUFFLE_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/SHUFFLE/E");
+        output.collect(key, record);
+
+        // SORT
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
+            + keys.get("SHUFFLE_FINISHED"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SHUFFLE_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("START_TIME", keys.get("SHUFFLE_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/SORT/S");
+        output.collect(key, record);
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/SORT/" + keys.get("JOBID") + "/"
+            + keys.get("SORT_FINISHED"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("SORT_FINISHED", keys.get("SORT_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/SORT/E");
+        output.collect(key, record);
+
+        // Reduce
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
+            + keys.get("SORT_FINISHED"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("START_TIME", keys.get("SORT_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/REDUCE/S");
+        output.collect(key, record);
+
+        key = new ChukwaRecordKey();
+        key.setKey("JobLogHist/REDUCE/" + keys.get("JOBID") + "/"
+            + keys.get("FINISH_TIME"));
+        key.setReduceType("JobLogHistoryReduceProcessor");
+        record = new ChukwaRecord();
+        record.setTime(Long.parseLong(keys.get("SORT_FINISHED")));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("FINISH_TIME", keys.get("SORT_FINISHED"));
+        record.add(Record.tagsField, chunk.getTags());
+        // log.info("JobLogHist/REDUCE/E");
+        output.collect(key, record);
+
+      } else if (keys.get("RECORD_TYPE").equalsIgnoreCase("Job")
+          && keys.containsKey("COUNTERS")) {
+        // Job JOBID="job_200804210403_0005" FINISH_TIME="1208760906816"
+        // JOB_STATUS="SUCCESS" FINISHED_MAPS="5912"
+        // FINISHED_REDUCES="739" FAILED_MAPS="0" FAILED_REDUCES="0"
+        // COUNTERS="File Systems.Local bytes read:1735053407244,File
+        // Systems.Local bytes written:2610106384012,File Systems.HDFS
+        // bytes read:801605644910,File Systems.HDFS bytes
+        // written:44135800,
+        // Job Counters .Launched map tasks:5912,Job Counters .Launched
+        // reduce tasks:739,Job Counters .Data-local map tasks:5573,Job
+        // Counters .Rack-local map tasks:316,Map-Reduce Framework.
+        // Map input records:9410696067,Map-Reduce Framework.Map output
+        // records:9410696067,Map-Reduce Framework.Map input
+        // bytes:801599188816,Map-Reduce Framework.Map output
+        // bytes:784427968116,
+        // Map-Reduce Framework.Combine input records:0,Map-Reduce
+        // Framework.Combine output records:0,Map-Reduce
+        // Framework.Reduce input groups:477265,Map-Reduce
+        // Framework.Reduce input records:739000,
+        // Map-Reduce Framework.Reduce output records:739000"
+
+        record = new ChukwaRecord();
+        key = new ChukwaRecordKey();
+        buildGenericRecord(record, null, Long
+            .parseLong(keys.get("FINISH_TIME")), "MRJobCounters");
+        extractCounters(record, keys.get("COUNTERS"));
+
+        String jobId = keys.get("JOBID").replace("_", "").substring(3);
+        record.add("JobId", "" + jobId);
+
+        // FIXME validate this when HodId will be available
+        if (keys.containsKey("HODID")) {
+          record.add("HodId", keys.get("HODID"));
+        }
+
+        // log.info("MRJobCounters +1");
+        output.collect(key, record);
+      }
+
+      if (keys.containsKey("TASK_TYPE")
+          && keys.containsKey("COUNTERS")
+          && (keys.get("TASK_TYPE").equalsIgnoreCase("REDUCE") || keys.get(
+              "TASK_TYPE").equalsIgnoreCase("MAP"))) {
+        // MAP
+        // Task TASKID="tip_200804210403_0005_m_000154" TASK_TYPE="MAP"
+        // TASK_STATUS="SUCCESS" FINISH_TIME="1208760463883"
+        // COUNTERS="File Systems.Local bytes read:159265655,File
+        // Systems.Local bytes written:318531310,
+        // File Systems.HDFS bytes read:145882417,Map-Reduce
+        // Framework.Map input records:1706604,
+        // Map-Reduce Framework.Map output records:1706604,Map-Reduce
+        // Framework.Map input bytes:145882057,
+        // Map-Reduce Framework.Map output bytes:142763253,Map-Reduce
+        // Framework.Combine input records:0,Map-Reduce
+        // Framework.Combine output records:0"
+
+        // REDUCE
+        // Task TASKID="tip_200804210403_0005_r_000524"
+        // TASK_TYPE="REDUCE" TASK_STATUS="SUCCESS"
+        // FINISH_TIME="1208760877072"
+        // COUNTERS="File Systems.Local bytes read:1179319677,File
+        // Systems.Local bytes written:1184474889,File Systems.HDFS
+        // bytes written:59021,
+        // Map-Reduce Framework.Reduce input groups:684,Map-Reduce
+        // Framework.Reduce input records:1000,Map-Reduce
+        // Framework.Reduce output records:1000"
+
+        record = new ChukwaRecord();
+        key = new ChukwaRecordKey();
+        buildGenericRecord(record, null, Long
+            .parseLong(keys.get("FINISH_TIME")), "SizeVsFinishTime");
+        extractCounters(record, keys.get("COUNTERS"));
+        record.add("JOBID", keys.get("JOBID"));
+        record.add("TASKID", keys.get("TASKID"));
+        record.add("TASK_TYPE", keys.get("TASK_TYPE"));
+
+        // log.info("MR_Graph +1");
+        output.collect(key, record);
+
+      }
+    } catch (IOException e) {
+      log.warn("Unable to collect output in JobLogHistoryProcessor ["
+          + recordEntry + "]", e);
+      e.printStackTrace();
+      throw e;
+    }
+
+  }
+
+  protected void extractCounters(ChukwaRecord record, String input) {
+
+    String[] data = null;
+    String[] counters = input.split(",");
+
+    for (String counter : counters) {
+      data = counter.split(":");
+      record.add(data[0].replaceAll(" ", "_").replaceAll("\\.", "_")
+          .toUpperCase(), data[1]);
+    }
+  }
+
+  public String getDataType() {
+    return Log4jJobHistoryProcessor.recordType;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
@@ -25,7 +26,7 @@
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
-public interface MapProcessor
-{
-	public void process(ChukwaArchiveKey archiveKey,Chunk chunk,OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter);
+public interface MapProcessor {
+  public void process(ChukwaArchiveKey archiveKey, Chunk chunk,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter);
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/MapProcessorFactory.java Wed Mar 11 22:39:26 2009
@@ -18,49 +18,38 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-import java.util.HashMap;
 
+import java.util.HashMap;
 import org.apache.log4j.Logger;
 
+public class MapProcessorFactory {
+  static Logger log = Logger.getLogger(MapProcessorFactory.class);
+
+  private static HashMap<String, MapProcessor> processors = new HashMap<String, MapProcessor>(); // registry
 
+  private MapProcessorFactory() {
+  }
 
-public class MapProcessorFactory
-{
-	static Logger log = Logger.getLogger(MapProcessorFactory.class);	
-	
-	private static HashMap<String,MapProcessor > processors =
-	    new HashMap<String, MapProcessor>(); // registry
-		
-	private MapProcessorFactory()
-	{}
-	
-	public static MapProcessor getProcessor(String parserClass)
-	 throws UnknownRecordTypeException
-	{
-		if (processors.containsKey(parserClass)) 
-		{
-			return processors.get(parserClass);
-		} 
-		else 
-		{
-			MapProcessor processor = null;
-			try 
-			{
-				processor = (MapProcessor)Class.forName(parserClass).getConstructor().newInstance();
-			} 
-			catch(ClassNotFoundException e) 
-			{
-				throw new UnknownRecordTypeException("Unknown parserClass:" + parserClass, e);			
-			}
-			catch(Exception e) 
-			{
-			  throw new UnknownRecordTypeException("error constructing processor", e);
-			}
-			
-			//TODO using a ThreadSafe/reuse flag to actually decide if we want 
-			// to reuse the same processor again and again
-			processors.put(parserClass,processor);
-			return processor;
-		}
-	}
+  public static MapProcessor getProcessor(String parserClass)
+      throws UnknownRecordTypeException {
+    if (processors.containsKey(parserClass)) {
+      return processors.get(parserClass);
+    } else {
+      MapProcessor processor = null;
+      try {
+        processor = (MapProcessor) Class.forName(parserClass).getConstructor()
+            .newInstance();
+      } catch (ClassNotFoundException e) {
+        throw new UnknownRecordTypeException("Unknown parserClass:"
+            + parserClass, e);
+      } catch (Exception e) {
+        throw new UnknownRecordTypeException("error constructing processor", e);
+      }
+
+      // TODO using a ThreadSafe/reuse flag to actually decide if we want
+      // to reuse the same processor again and again
+      processors.put(parserClass, processor);
+      return processor;
+    }
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,27 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-public class PbsInvalidEntry extends Exception
-{
 
-	/**
+public class PbsInvalidEntry extends Exception {
+
+  /**
 	 * 
 	 */
-	private static final long serialVersionUID = 9154096600390233023L;
+  private static final long serialVersionUID = 9154096600390233023L;
 
-	public PbsInvalidEntry()
-	{}
+  public PbsInvalidEntry() {
+  }
 
-	public PbsInvalidEntry(String message)
-	{
-		super(message);
-	}
-
-	public PbsInvalidEntry(Throwable cause)
-	{
-		super(cause);
-	}
-
-	public PbsInvalidEntry(String message, Throwable cause)
-	{
-		super(message, cause);
-	}
+  public PbsInvalidEntry(String message) {
+    super(message);
+  }
+
+  public PbsInvalidEntry(Throwable cause) {
+    super(cause);
+  }
+
+  public PbsInvalidEntry(String message, Throwable cause) {
+    super(message, cause);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java Wed Mar 11 22:39:26 2009
@@ -18,215 +18,181 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class PbsNodes extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(PbsNodes.class);
-
-	private static final String rawPBSRecordType = "PbsNodes";
-	private static final String machinePBSRecordType = "MachinePbsNodes";
-	private SimpleDateFormat sdf = null;
-
-	public PbsNodes()
-	{
-		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-	}
-
-	@Override
-	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	 throws Throwable
-	{
-		
-//		log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-		
-		
-
-		StringBuilder sb = new StringBuilder(); 	 
-		int i = 0;
-		String nodeActivityStatus = null;
-		StringBuilder sbFreeMachines = new StringBuilder();
-		StringBuilder sbUsedMachines = new StringBuilder();
-		StringBuilder sbDownMachines = new StringBuilder();
-		
-		int totalFreeNode = 0;
-		int totalUsedNode = 0;
-		int totalDownNode = 0;
-	
-		String body = null;
-		ChukwaRecord record = null;
-		
-		
-			try
-			{
-				
-				String dStr = recordEntry.substring(0, 23);
-				int start = 24;
-				int idx = recordEntry.indexOf(' ', start);
-				//String level = recordEntry.substring(start, idx);
-				start = idx+1;
-				idx = recordEntry.indexOf(' ', start );
-				//String className = recordEntry.substring(start, idx-1);
-				body = recordEntry.substring(idx+1);
-				
-				
-				Date d = sdf.parse(dStr );
-				
-				String[] lines = body.split("\n");
-				while (i < lines.length)
-				{
-					while ((i < lines.length) && (lines[i].trim().length() > 0))
-					{
-						sb.append(lines[i].trim()).append("\n");
-						i++;
-					}
-
-					 if ( (i< lines.length) && (lines[i].trim().length() > 0) )
-					 {
-					 throw new PbsInvalidEntry(recordEntry);
-					 }
-
-					// Empty line
-					i++;
-
-					if (sb.length() > 0)
-					{
-						body =  sb.toString();
-						// Process all entries for a machine
-						//System.out.println("=========>>> Record [" + body+ "]");
-						
-						record = new ChukwaRecord();
-						key = new ChukwaRecordKey();
-						
-						buildGenericRecord(record,null,d.getTime(),machinePBSRecordType);
-						parsePbsRecord(body, record);
-						
-						//Output PbsNode record for 1 machine
-						output.collect(key, record);
-						//log.info("PbsNodeProcessor output 1 sub-record");
-						
-						//compute Node Activity information
-						nodeActivityStatus = record.getValue("state");
-						if (nodeActivityStatus != null)
-						{
-							if (nodeActivityStatus.equals("free"))
-							{
-								totalFreeNode ++;
-								sbFreeMachines.append(record.getValue("Machine")).append(",");
-							}
-							else if (nodeActivityStatus.equals("job-exclusive"))
-							{
-								totalUsedNode ++;
-								sbUsedMachines.append(record.getValue("Machine")).append(",");
-							}
-							else
-							{
-								totalDownNode ++;
-								sbDownMachines.append(record.getValue("Machine")).append(",");
-							}					
-						}
-						sb = new StringBuilder();
-					}
-				}
-				
-				// End of parsing
-
-				record = new ChukwaRecord();
-				key = new ChukwaRecordKey();
-				buildGenericRecord(record,null,d.getTime(),"NodeActivity");
-				
-				record.setTime(d.getTime());
-				record.add("used", ""+totalUsedNode);
-				record.add("free", ""+totalFreeNode);
-				record.add("down", ""+totalDownNode);
-				record.add("usedMachines", sbUsedMachines.toString());
-				record.add("freeMachines", sbFreeMachines.toString());
-				record.add("downMachines", sbDownMachines.toString());
-				
-				output.collect(key,record);
-				//log.info("PbsNodeProcessor output 1 NodeActivity");					
-			}
-			catch (ParseException e)
-			{
-				e.printStackTrace();
-				log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
-				throw e;
-			}
-			catch (IOException e)
-			{
-				log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry + "]", e);
-				e.printStackTrace();
-				throw e;
-			}
-			catch (PbsInvalidEntry e)
-			{
-				log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
-				e.printStackTrace();
-				throw e;
-			}
-		
-		
-
-		
-	}
-
-	protected static void parsePbsRecord(String recordLine, ChukwaRecord record)
-	{
-		int i = 0;
-		String[] lines = recordLine.split("\n");
-		record.add("Machine", lines[0]);
-		
-		i++;
-		String[] data = null;
-		while (i < lines.length)
-		{
-			data = extractFields(lines[i]);	
-			record.add(data[0].trim(), data[1].trim());
-			if (data[0].trim().equalsIgnoreCase("status"))
-			{
-				parseStatusField(data[1].trim(), record);
-			}
-			i++;
-		}
-	}
-
-	protected static void parseStatusField(String statusField,
-			ChukwaRecord record)
-	{
-		String[] data = null;
-		String[] subFields = statusField.trim().split(",");
-		for (String subflied : subFields)
-		{
-			data = extractFields(subflied);
-			record.add("status-"+data[0].trim(), data[1].trim());
-		}
-	}
-
-
-	static String[] extractFields(String line)
-	{
-		String[] args = new String[2];
-		int index = line.indexOf("=");
-		args[0] = line.substring(0,index );
-		args[1] = line.substring(index + 1);
-
-		return args;
-	}
-
-	public String getDataType()
-	{
-		return PbsNodes.rawPBSRecordType;
-	}
-	
+public class PbsNodes extends AbstractProcessor {
+  static Logger log = Logger.getLogger(PbsNodes.class);
+
+  private static final String rawPBSRecordType = "PbsNodes";
+  private static final String machinePBSRecordType = "MachinePbsNodes";
+  private SimpleDateFormat sdf = null;
+
+  public PbsNodes() {
+    // TODO move that to config
+    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+  }
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+
+    // log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" +
+    // chunk.getDataType() + "]");
+
+    StringBuilder sb = new StringBuilder();
+    int i = 0;
+    String nodeActivityStatus = null;
+    StringBuilder sbFreeMachines = new StringBuilder();
+    StringBuilder sbUsedMachines = new StringBuilder();
+    StringBuilder sbDownMachines = new StringBuilder();
+
+    int totalFreeNode = 0;
+    int totalUsedNode = 0;
+    int totalDownNode = 0;
+
+    String body = null;
+    ChukwaRecord record = null;
+
+    try {
+
+      String dStr = recordEntry.substring(0, 23);
+      int start = 24;
+      int idx = recordEntry.indexOf(' ', start);
+      // String level = recordEntry.substring(start, idx);
+      start = idx + 1;
+      idx = recordEntry.indexOf(' ', start);
+      // String className = recordEntry.substring(start, idx-1);
+      body = recordEntry.substring(idx + 1);
+
+      Date d = sdf.parse(dStr);
+
+      String[] lines = body.split("\n");
+      while (i < lines.length) {
+        while ((i < lines.length) && (lines[i].trim().length() > 0)) {
+          sb.append(lines[i].trim()).append("\n");
+          i++;
+        }
+
+        if ((i < lines.length) && (lines[i].trim().length() > 0)) {
+          throw new PbsInvalidEntry(recordEntry);
+        }
+
+        // Empty line
+        i++;
+
+        if (sb.length() > 0) {
+          body = sb.toString();
+          // Process all entries for a machine
+          // System.out.println("=========>>> Record [" + body+ "]");
+
+          record = new ChukwaRecord();
+          key = new ChukwaRecordKey();
+
+          buildGenericRecord(record, null, d.getTime(), machinePBSRecordType);
+          parsePbsRecord(body, record);
+
+          // Output PbsNode record for 1 machine
+          output.collect(key, record);
+          // log.info("PbsNodeProcessor output 1 sub-record");
+
+          // compute Node Activity information
+          nodeActivityStatus = record.getValue("state");
+          if (nodeActivityStatus != null) {
+            if (nodeActivityStatus.equals("free")) {
+              totalFreeNode++;
+              sbFreeMachines.append(record.getValue("Machine")).append(",");
+            } else if (nodeActivityStatus.equals("job-exclusive")) {
+              totalUsedNode++;
+              sbUsedMachines.append(record.getValue("Machine")).append(",");
+            } else {
+              totalDownNode++;
+              sbDownMachines.append(record.getValue("Machine")).append(",");
+            }
+          }
+          sb = new StringBuilder();
+        }
+      }
+
+      // End of parsing
+
+      record = new ChukwaRecord();
+      key = new ChukwaRecordKey();
+      buildGenericRecord(record, null, d.getTime(), "NodeActivity");
+
+      record.setTime(d.getTime());
+      record.add("used", "" + totalUsedNode);
+      record.add("free", "" + totalFreeNode);
+      record.add("down", "" + totalDownNode);
+      record.add("usedMachines", sbUsedMachines.toString());
+      record.add("freeMachines", sbFreeMachines.toString());
+      record.add("downMachines", sbDownMachines.toString());
+
+      output.collect(key, record);
+      // log.info("PbsNodeProcessor output 1 NodeActivity");
+    } catch (ParseException e) {
+      e.printStackTrace();
+      log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
+      throw e;
+    } catch (IOException e) {
+      log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry
+          + "]", e);
+      e.printStackTrace();
+      throw e;
+    } catch (PbsInvalidEntry e) {
+      log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e);
+      e.printStackTrace();
+      throw e;
+    }
+
+  }
+
+  protected static void parsePbsRecord(String recordLine, ChukwaRecord record) {
+    int i = 0;
+    String[] lines = recordLine.split("\n");
+    record.add("Machine", lines[0]);
+
+    i++;
+    String[] data = null;
+    while (i < lines.length) {
+      data = extractFields(lines[i]);
+      record.add(data[0].trim(), data[1].trim());
+      if (data[0].trim().equalsIgnoreCase("status")) {
+        parseStatusField(data[1].trim(), record);
+      }
+      i++;
+    }
+  }
+
+  protected static void parseStatusField(String statusField, ChukwaRecord record) {
+    String[] data = null;
+    String[] subFields = statusField.trim().split(",");
+    for (String subflied : subFields) {
+      data = extractFields(subflied);
+      record.add("status-" + data[0].trim(), data[1].trim());
+    }
+  }
+
+  static String[] extractFields(String line) {
+    String[] args = new String[2];
+    int index = line.indexOf("=");
+    args[0] = line.substring(0, index);
+    args[1] = line.substring(index + 1);
+
+    return args;
+  }
+
+  public String getDataType() {
+    return PbsNodes.rawPBSRecordType;
+  }
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ProcessorFactory.java Wed Mar 11 22:39:26 2009
@@ -18,65 +18,63 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-import java.util.HashMap;
 
+import java.util.HashMap;
 import org.apache.log4j.Logger;
 
+public class ProcessorFactory {
+  static Logger log = Logger.getLogger(ProcessorFactory.class);
 
-
-public class ProcessorFactory
-{
-	static Logger log = Logger.getLogger(ProcessorFactory.class);
-	
-	// TODO
-	//	add new mapper package at the end.
-	//	We should have a more generic way to do this.
-	//	Ex: read from config
-	//	list of alias
-	//	and
-	//	alias -> processor class
-	
-	
-	private static HashMap<String,ChunkProcessor > processors =
-	    new HashMap<String, ChunkProcessor>(); // registry
-		
-	private ProcessorFactory()
-	{}
-	
-	public static ChunkProcessor getProcessor(String recordType)
-	 throws UnknownRecordTypeException
-	{
-		String path = "org.apache.hadoop.chukwa.extraction.demux.processor.mapper"+recordType;
-		if (processors.containsKey(recordType)) {
-			return processors.get(recordType);
-		} else {
-			ChunkProcessor processor = null;
-			try {
-				processor = (ChunkProcessor)Class.forName(path).getConstructor().newInstance();
-			} catch(ClassNotFoundException e) {
-				throw new UnknownRecordTypeException("Unknown recordType:" + recordType, e);			
-			} catch(Exception e) {
-			  throw new UnknownRecordTypeException("error constructing processor", e);
-			}
-			
-			//TODO using a ThreadSafe/reuse flag to actually decide if we want 
-			// to reuse the same processor again and again
-			register(recordType,processor);
-			return processor;
-		}
-	}
-	
-	  /** Register a specific parser for a {@link ChunkProcessor}
-	   * implementation. */
-	  public static synchronized void register(String recordType,
-	                                         ChunkProcessor processor) 
-	  {
-		  log.info("register " + processor.getClass().getName() + " for this recordType :" + recordType);
-		  if (processors.containsKey(recordType))
-			{
-			  throw new DuplicateProcessorException("Duplicate processor for recordType:" + recordType);
-			}
-		  ProcessorFactory.processors.put(recordType, processor);
-	  }
+  // TODO
+  // add new mapper package at the end.
+  // We should have a more generic way to do this.
+  // Ex: read from config
+  // list of alias
+  // and
+  // alias -> processor class
+
+  private static HashMap<String, ChunkProcessor> processors = new HashMap<String, ChunkProcessor>(); // registry
+
+  private ProcessorFactory() {
+  }
+
+  public static ChunkProcessor getProcessor(String recordType)
+      throws UnknownRecordTypeException {
+    String path = "org.apache.hadoop.chukwa.extraction.demux.processor.mapper"
+        + recordType;
+    if (processors.containsKey(recordType)) {
+      return processors.get(recordType);
+    } else {
+      ChunkProcessor processor = null;
+      try {
+        processor = (ChunkProcessor) Class.forName(path).getConstructor()
+            .newInstance();
+      } catch (ClassNotFoundException e) {
+        throw new UnknownRecordTypeException(
+            "Unknown recordType:" + recordType, e);
+      } catch (Exception e) {
+        throw new UnknownRecordTypeException("error constructing processor", e);
+      }
+
+      // TODO using a ThreadSafe/reuse flag to actually decide if we want
+      // to reuse the same processor again and again
+      register(recordType, processor);
+      return processor;
+    }
+  }
+
+  /**
+   * Register a specific parser for a {@link ChunkProcessor} implementation.
+   */
+  public static synchronized void register(String recordType,
+      ChunkProcessor processor) {
+    log.info("register " + processor.getClass().getName()
+        + " for this recordType :" + recordType);
+    if (processors.containsKey(recordType)) {
+      throw new DuplicateProcessorException(
+          "Duplicate processor for recordType:" + recordType);
+    }
+    ProcessorFactory.processors.put(recordType, processor);
+  }
 
 }