You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [7/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java Wed Mar 11 22:39:26 2009
@@ -18,11 +18,11 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
-
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,251 +34,237 @@
 // First version of the Spill
 // need some polishing
 
-public class MoveToRepository
-{
-	static Logger log = Logger.getLogger(MoveToRepository.class);
-	
-	static ChukwaConfiguration conf = null;
-	static FileSystem fs = null;
-	static final String HadoopLogDir = "_logs";
-	static final String hadoopTempDir = "_temporary";
-	static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
-	static Calendar calendar = Calendar.getInstance();
-	
-	static void processClutserDirectory(Path srcDir,String destDir) throws Exception
-	{
-		log.info("processClutserDirectory (" + srcDir.getName() + "," + destDir +")");
-		FileStatus fstat = fs.getFileStatus(srcDir);
-		
-		if (!fstat.isDir())
-		{
-			throw new IOException(srcDir + " is not a directory!");
-		}
-		else
-		{
-			FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
-			
-			for(FileStatus datasourceDirectory : datasourceDirectories)
-			{
-				log.info(datasourceDirectory.getPath() + " isDir?" +datasourceDirectory.isDir());
-				if (!datasourceDirectory.isDir())
-				{
-					throw new IOException("Top level datasource directory should be a directory :" + datasourceDirectory.getPath());
-				}
-				
-				String dirName = datasourceDirectory.getPath().getName();
-				Path destPath = new Path(destDir + "/" + dirName);
-				log.info("dest directory path: " + destPath);
-				log.info("processClutserDirectory processing Datasource: (" + dirName +")");
-				processDatasourceDirectory(srcDir.getName(),datasourceDirectory.getPath(),destDir + "/" + dirName);
-			}
-		}
-	}
-	
-	static void processDatasourceDirectory(String cluster,Path srcDir,String destDir) throws Exception
-	{
-		String fileName = null;
-		int fileDay  = 0;
-		int fileHour = 0;
-		int fileMin  = 0;
-		
-		FileStatus[] recordFiles = fs.listStatus(srcDir);
-		for(FileStatus recordFile : recordFiles)
-		{
-			//   dataSource_20080915_18_15.1.evt
-			// <datasource>_<yyyyMMdd_HH_mm>.1.evt
-			
-			fileName = recordFile.getPath().getName();
-			log.info("processDatasourceDirectory processing RecordFile: (" + fileName +")");
-			log.info("fileName: " + fileName);
-			
-			
-			int l = fileName.length();
-			String dataSource = srcDir.getName();
-			log.info("Datasource: " + dataSource);
-			
-			if (fileName.endsWith(".D.evt"))
-			{
-				// Hadoop_dfs_datanode_20080919.D.evt
-				
-				fileDay = Integer.parseInt(fileName.substring(l-14,l-6));
-				writeRecordFile(destDir + "/" + fileDay + "/", recordFile.getPath(),dataSource + "_" +fileDay);
-				// mark this directory for Daily rotate (re-process)
-				addDirectory4Rolling( true,fileDay , fileHour, cluster , dataSource);
-			}
-			else if (fileName.endsWith(".H.evt"))
-			{ 
-				// Hadoop_dfs_datanode_20080925_1.H.evt
-				// Hadoop_dfs_datanode_20080925_12.H.evt
-				
-				String day = null;
-			    String hour = null;
-			    if (fileName.charAt(l-8) == '_')
-			    {
-			    	day = fileName.substring(l-16,l-8);
-			    	log.info("day->" + day);
-			    	hour = "" +fileName.charAt(l-7);
-			    	log.info("hour->" +hour);
-			    }
-			    else
-			    {
-			    	day = fileName.substring(l-17,l-9);
-			    	log.info("day->" +day);
-			    	hour = fileName.substring(l-8,l-6);
-			    	log.info("hour->" +hour);
-			    }
-			    fileDay = Integer.parseInt(day);
-			    fileHour = Integer.parseInt(hour);
-			    // rotate there so spill
-				writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/", recordFile.getPath(),dataSource + "_"  +fileDay+ "_" + fileHour );
-				// mark this directory for daily rotate
-				addDirectory4Rolling( true,fileDay , fileHour, cluster , dataSource);
-			}
-			else if (fileName.endsWith(".R.evt"))
-			{
-				if (fileName.charAt(l-11) == '_')
-				{
-					fileDay = Integer.parseInt(fileName.substring(l-19,l-11));
-					fileHour = Integer.parseInt(""+fileName.charAt(l-10));
-					fileMin = Integer.parseInt(fileName.substring(l-8,l-6));
-				}
-				else
-				{
-					fileDay = Integer.parseInt(fileName.substring(l-20,l-12));
-					fileHour = Integer.parseInt(fileName.substring(l-11,l-9));
-					fileMin = Integer.parseInt(fileName.substring(l-8,l-6));
-				}
-
-				log.info("fileDay: " + fileDay);
-				log.info("fileHour: " + fileHour);
-				log.info("fileMin: " + fileMin);
-				writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/" + fileMin, recordFile.getPath(),dataSource + "_" +fileDay+ "_" + fileHour +"_" +fileMin);
-				// mark this directory for hourly rotate
-				addDirectory4Rolling( false,fileDay , fileHour, cluster , dataSource);
-			}
-			else
-			{
-				throw new RuntimeException("Wrong fileName format! [" + fileName+"]");
-			}
-		}
-	}
-			
-	static void addDirectory4Rolling(boolean isDailyOnly, int day,int hour,String cluster, String dataSource) throws IOException
-	{
-		// TODO get root directory from config
-		String rollingDirectory = "/chukwa/rolling/";
-		
-		Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster +"/" + dataSource);
-		if (!fs.exists(path))
-			{ fs.mkdirs(path);}
-		
-		if (!isDailyOnly)
-		{
-			path = new Path(rollingDirectory + "/hourly/" + day + "/"  + hour + "/" + cluster +"/" + dataSource);
-			if (!fs.exists(path))
-				{ fs.mkdirs(path);}
-		}
-	}
-	
-	static void writeRecordFile(String destDir,Path recordFile,String fileName) throws IOException
-	{
-		boolean done = false;
-		int count = 1;
-		do
-		{
-			Path destDirPath = new Path(destDir );
-			Path destFilePath = new Path(destDir + "/" + fileName + "." + count + ".evt" );
-			
-			if (!fs.exists(destDirPath))
-			{
-				fs.mkdirs(destDirPath);
-				log.info(">>>>>>>>>>>> create Dir" + destDirPath);
-			}
-			
-			if (!fs.exists(destFilePath))
-			{
-				log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "+ destFilePath);
-				//fs.rename(recordFile,destFilePath);
-				FileUtil.copy(fs,recordFile,fs,destFilePath,false,false,conf);
-				//FileUtil.replaceFile(new File(recordFile.toUri()), new File(destFilePath.toUri()));
-				done = true;
-				log.info(">>>>>>>>>>>> after Rename" + destFilePath);
-			}
-			else
-			{
-				log.info("Start MoveToRepository main()");
-			}
-			count ++;
-			// Just put a limit here
-			// TODO read from config
-			if (count > 1000)
-			{
-				throw new IOException("too many files in this directory: " + destDir);
-			}
-		} while (!done);
-	}
-	
-	static boolean checkRotate(String directoryAsString, boolean createDirectoryIfNotExist) throws IOException
-	{
-		Path directory = new Path(directoryAsString);
-		boolean exist = fs.exists(directory);
-	
-		if (! exist )
-		{
-			if (createDirectoryIfNotExist== true)
-				{ fs.mkdirs(directory); }
-			return false;
-		}
-		else
-		{
-			return fs.exists(new Path(directoryAsString + "/rotateDone"));
-		}
-	}
-				
-	/**
-	 * @param args
-	 * @throws Exception 
-	 */
-	public static void main(String[] args) throws Exception
-	{
-		conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		fs = FileSystem.get(new URI(fsName), conf);
-		
-		Path srcDir = new Path(args[0]);
-		String destDir = args[1];
-		
-		log.info("Start MoveToRepository main()");
-		
-		FileStatus fstat = fs.getFileStatus(srcDir);
-		
-		if (!fstat.isDir())
-		{
-			throw new IOException(srcDir + " is not a directory!");
-		}
-		else
-		{
-			FileStatus[] clusters = fs.listStatus(srcDir);
-			// Run a moveOrMerge on all clusters
-			String name = null;
-			for(FileStatus cluster : clusters)
-			{
-				name = cluster.getPath().getName();
-				// Skip hadoop M/R outputDir
-				if ( (name.intern() == HadoopLogDir.intern() ) || (name.intern() == hadoopTempDir.intern()) )
-				{
-					continue;
-				}
-				log.info("main procesing Cluster (" + cluster.getPath().getName() +")");
-				processClutserDirectory(cluster.getPath(),destDir + "/" + cluster.getPath().getName());
-				
-				// Delete the demux's cluster dir 
-				FileUtil.fullyDelete(fs,cluster.getPath());
-			}
-		}
-		
-		log.info("Done with MoveToRepository main()");
+public class MoveToRepository {
+  static Logger log = Logger.getLogger(MoveToRepository.class);
+
+  static ChukwaConfiguration conf = null;
+  static FileSystem fs = null;
+  static final String HadoopLogDir = "_logs";
+  static final String hadoopTempDir = "_temporary";
+  static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+  static Calendar calendar = Calendar.getInstance();
+
+  static void processClutserDirectory(Path srcDir, String destDir)
+      throws Exception {
+    log.info("processClutserDirectory (" + srcDir.getName() + "," + destDir
+        + ")");
+    FileStatus fstat = fs.getFileStatus(srcDir);
+
+    if (!fstat.isDir()) {
+      throw new IOException(srcDir + " is not a directory!");
+    } else {
+      FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
+
+      for (FileStatus datasourceDirectory : datasourceDirectories) {
+        log.info(datasourceDirectory.getPath() + " isDir?"
+            + datasourceDirectory.isDir());
+        if (!datasourceDirectory.isDir()) {
+          throw new IOException(
+              "Top level datasource directory should be a directory :"
+                  + datasourceDirectory.getPath());
+        }
+
+        String dirName = datasourceDirectory.getPath().getName();
+        Path destPath = new Path(destDir + "/" + dirName);
+        log.info("dest directory path: " + destPath);
+        log.info("processClutserDirectory processing Datasource: (" + dirName
+            + ")");
+        processDatasourceDirectory(srcDir.getName(), datasourceDirectory
+            .getPath(), destDir + "/" + dirName);
+      }
+    }
+  }
+
+  static void processDatasourceDirectory(String cluster, Path srcDir,
+      String destDir) throws Exception {
+    String fileName = null;
+    int fileDay = 0;
+    int fileHour = 0;
+    int fileMin = 0;
+
+    FileStatus[] recordFiles = fs.listStatus(srcDir);
+    for (FileStatus recordFile : recordFiles) {
+      // dataSource_20080915_18_15.1.evt
+      // <datasource>_<yyyyMMdd_HH_mm>.1.evt
+
+      fileName = recordFile.getPath().getName();
+      log.info("processDatasourceDirectory processing RecordFile: (" + fileName
+          + ")");
+      log.info("fileName: " + fileName);
+
+      int l = fileName.length();
+      String dataSource = srcDir.getName();
+      log.info("Datasource: " + dataSource);
+
+      if (fileName.endsWith(".D.evt")) {
+        // Hadoop_dfs_datanode_20080919.D.evt
+
+        fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6));
+        writeRecordFile(destDir + "/" + fileDay + "/", recordFile.getPath(),
+            dataSource + "_" + fileDay);
+        // mark this directory for Daily rotate (re-process)
+        addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource);
+      } else if (fileName.endsWith(".H.evt")) {
+        // Hadoop_dfs_datanode_20080925_1.H.evt
+        // Hadoop_dfs_datanode_20080925_12.H.evt
+
+        String day = null;
+        String hour = null;
+        if (fileName.charAt(l - 8) == '_') {
+          day = fileName.substring(l - 16, l - 8);
+          log.info("day->" + day);
+          hour = "" + fileName.charAt(l - 7);
+          log.info("hour->" + hour);
+        } else {
+          day = fileName.substring(l - 17, l - 9);
+          log.info("day->" + day);
+          hour = fileName.substring(l - 8, l - 6);
+          log.info("hour->" + hour);
+        }
+        fileDay = Integer.parseInt(day);
+        fileHour = Integer.parseInt(hour);
+        // rotate there so spill
+        writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/",
+            recordFile.getPath(), dataSource + "_" + fileDay + "_" + fileHour);
+        // mark this directory for daily rotate
+        addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource);
+      } else if (fileName.endsWith(".R.evt")) {
+        if (fileName.charAt(l - 11) == '_') {
+          fileDay = Integer.parseInt(fileName.substring(l - 19, l - 11));
+          fileHour = Integer.parseInt("" + fileName.charAt(l - 10));
+          fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
+        } else {
+          fileDay = Integer.parseInt(fileName.substring(l - 20, l - 12));
+          fileHour = Integer.parseInt(fileName.substring(l - 11, l - 9));
+          fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
+        }
+
+        log.info("fileDay: " + fileDay);
+        log.info("fileHour: " + fileHour);
+        log.info("fileMin: " + fileMin);
+        writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/"
+            + fileMin, recordFile.getPath(), dataSource + "_" + fileDay + "_"
+            + fileHour + "_" + fileMin);
+        // mark this directory for hourly rotate
+        addDirectory4Rolling(false, fileDay, fileHour, cluster, dataSource);
+      } else {
+        throw new RuntimeException("Wrong fileName format! [" + fileName + "]");
+      }
+    }
+  }
+
+  static void addDirectory4Rolling(boolean isDailyOnly, int day, int hour,
+      String cluster, String dataSource) throws IOException {
+    // TODO get root directory from config
+    String rollingDirectory = "/chukwa/rolling/";
+
+    Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster
+        + "/" + dataSource);
+    if (!fs.exists(path)) {
+      fs.mkdirs(path);
+    }
+
+    if (!isDailyOnly) {
+      path = new Path(rollingDirectory + "/hourly/" + day + "/" + hour + "/"
+          + cluster + "/" + dataSource);
+      if (!fs.exists(path)) {
+        fs.mkdirs(path);
+      }
+    }
+  }
+
+  static void writeRecordFile(String destDir, Path recordFile, String fileName)
+      throws IOException {
+    boolean done = false;
+    int count = 1;
+    do {
+      Path destDirPath = new Path(destDir);
+      Path destFilePath = new Path(destDir + "/" + fileName + "." + count
+          + ".evt");
+
+      if (!fs.exists(destDirPath)) {
+        fs.mkdirs(destDirPath);
+        log.info(">>>>>>>>>>>> create Dir" + destDirPath);
+      }
+
+      if (!fs.exists(destFilePath)) {
+        log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "
+            + destFilePath);
+        // fs.rename(recordFile,destFilePath);
+        FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf);
+        // FileUtil.replaceFile(new File(recordFile.toUri()), new
+        // File(destFilePath.toUri()));
+        done = true;
+        log.info(">>>>>>>>>>>> after Rename" + destFilePath);
+      } else {
+        log.info("Start MoveToRepository main()");
+      }
+      count++;
+      // Just put a limit here
+      // TODO read from config
+      if (count > 1000) {
+        throw new IOException("too many files in this directory: " + destDir);
+      }
+    } while (!done);
+  }
+
+  static boolean checkRotate(String directoryAsString,
+      boolean createDirectoryIfNotExist) throws IOException {
+    Path directory = new Path(directoryAsString);
+    boolean exist = fs.exists(directory);
+
+    if (!exist) {
+      if (createDirectoryIfNotExist == true) {
+        fs.mkdirs(directory);
+      }
+      return false;
+    } else {
+      return fs.exists(new Path(directoryAsString + "/rotateDone"));
+    }
+  }
+
+  /**
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    conf = new ChukwaConfiguration();
+    String fsName = conf.get("writer.hdfs.filesystem");
+    fs = FileSystem.get(new URI(fsName), conf);
+
+    Path srcDir = new Path(args[0]);
+    String destDir = args[1];
+
+    log.info("Start MoveToRepository main()");
+
+    FileStatus fstat = fs.getFileStatus(srcDir);
+
+    if (!fstat.isDir()) {
+      throw new IOException(srcDir + " is not a directory!");
+    } else {
+      FileStatus[] clusters = fs.listStatus(srcDir);
+      // Run a moveOrMerge on all clusters
+      String name = null;
+      for (FileStatus cluster : clusters) {
+        name = cluster.getPath().getName();
+        // Skip hadoop M/R outputDir
+        if ((name.intern() == HadoopLogDir.intern())
+            || (name.intern() == hadoopTempDir.intern())) {
+          continue;
+        }
+        log
+            .info("main procesing Cluster (" + cluster.getPath().getName()
+                + ")");
+        processClutserDirectory(cluster.getPath(), destDir + "/"
+            + cluster.getPath().getName());
+
+        // Delete the demux's cluster dir
+        FileUtil.fullyDelete(fs, cluster.getPath());
+      }
+    }
+
+    log.info("Done with MoveToRepository main()");
 
-	}
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
-import java.io.IOException;
 
+import java.io.IOException;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,115 +29,105 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
-public class RecordMerger extends Thread
-{
-	static Logger log = Logger.getLogger(RecordMerger.class);
-	ChukwaConfiguration conf = null;
-	FileSystem fs = null;
-	String[] mergeArgs = null;
-	Tool tool = null;
-	boolean deleteRawData = false;
-	
-	public RecordMerger(ChukwaConfiguration conf,FileSystem fs,Tool tool,String[] mergeArgs,boolean deleteRawData)
-	{
-		this.conf = conf;
-		this.fs = fs;
-		this.tool = tool;
-		this.mergeArgs = mergeArgs;
-		this.deleteRawData = deleteRawData;
-	}
-	@Override
-	public void run()
-	{
-		System.out.println("\t Running Merge! : output [" + mergeArgs[1] +"]");
-		int res;
-		try
-		{
-			res = ToolRunner.run(conf,tool, mergeArgs);
-			System.out.println("MR exit status: " + res);
-			if (res == 0)
-			{
-				writeRecordFile(mergeArgs[1]+"/part-00000",mergeArgs[2],mergeArgs[3]);
-				
-				// delete input
-				if (deleteRawData)
-				 { 
-					FileUtil.fullyDelete(fs,new Path(mergeArgs[0])); 
-					
-					Path hours = new Path(mergeArgs[2]) ;
-					FileStatus[] hoursOrMinutesFS = fs.listStatus(hours);
-					for(FileStatus hourOrMinuteFS : hoursOrMinutesFS)
-					{
-						String dirName = hourOrMinuteFS.getPath().getName();
-						
-						try
-						{ 
-							Integer.parseInt(dirName);
-							FileUtil.fullyDelete(fs,new Path(mergeArgs[2] + "/" + dirName)); 
-							if (log.isDebugEnabled() )
-								{ log.debug("Deleting Hour directory: " + mergeArgs[2] + "/" + dirName); }
-						}
-						catch(NumberFormatException e) { /* Not an Hour or Minutes directory- Do nothing */ }
-					}
-				 }
-				
-				// delete rolling tag
-				FileUtil.fullyDelete(fs, new Path(mergeArgs[3]));
-				// delete M/R temp directory
-				FileUtil.fullyDelete(fs, new Path(mergeArgs[1]));
-			}
-			else
-			{
-				throw new RuntimeException("Error in M/R merge operation!");
-			}
-
-		} 
-		catch (Exception e)
-		{
-			e.printStackTrace();
-			throw new RuntimeException("Error in M/R merge operation!",e);
-		}
-	}
-	
-	
-	void writeRecordFile(String input,String outputDir,String fileName) throws IOException
-	{
-		boolean done = false;
-		int count = 1;
-		Path recordFile = new Path(input);
-		do
-		{
-			Path destDirPath = new Path(outputDir );
-			Path destFilePath = new Path(outputDir + "/" + fileName + "." + count + ".evt" );
-			
-			if (!fs.exists(destDirPath))
-			{
-				fs.mkdirs(destDirPath);
-				log.info(">>>>>>>>>>>> create Dir" + destDirPath);
-			}
-			
-			if (!fs.exists(destFilePath))
-			{
-				boolean res = fs.rename(recordFile,destFilePath);
-				
-				if (res == false)
-				{
-					log.info(">>>>>>>>>>>> Use standard copy rename failded");
-					FileUtil.copy(fs,recordFile,fs,destFilePath,false,false,conf);
-				}
-				done = true;
-			}
-			else
-			{
-				log.info("Start MoveToRepository main()");
-			}
-			count ++;
-			// Just put a limit here
-			// TODO read from config
-			if (count > 1000)
-			{
-				throw new IOException("too many files in this directory: " + destDirPath);
-			}
-		} while (!done);
-	}
+public class RecordMerger extends Thread {
+  static Logger log = Logger.getLogger(RecordMerger.class);
+  ChukwaConfiguration conf = null;
+  FileSystem fs = null;
+  String[] mergeArgs = null;
+  Tool tool = null;
+  boolean deleteRawData = false;
+
+  public RecordMerger(ChukwaConfiguration conf, FileSystem fs, Tool tool,
+                      String[] mergeArgs, boolean deleteRawData) {
+    this.conf = conf;
+    this.fs = fs;
+    this.tool = tool;
+    this.mergeArgs = mergeArgs;
+    this.deleteRawData = deleteRawData;
+  }
+
+  @Override
+  public void run() {
+    System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
+    int res;
+    try {
+      res = ToolRunner.run(conf, tool, mergeArgs);
+      System.out.println("MR exit status: " + res);
+      if (res == 0) {
+        writeRecordFile(mergeArgs[1] + "/part-00000", mergeArgs[2],
+            mergeArgs[3]);
+
+        // delete input
+        if (deleteRawData) {
+          FileUtil.fullyDelete(fs, new Path(mergeArgs[0]));
+
+          Path hours = new Path(mergeArgs[2]);
+          FileStatus[] hoursOrMinutesFS = fs.listStatus(hours);
+          for (FileStatus hourOrMinuteFS : hoursOrMinutesFS) {
+            String dirName = hourOrMinuteFS.getPath().getName();
+
+            try {
+              Integer.parseInt(dirName);
+              FileUtil.fullyDelete(fs, new Path(mergeArgs[2] + "/" + dirName));
+              if (log.isDebugEnabled()) {
+                log.debug("Deleting Hour directory: " + mergeArgs[2] + "/"
+                    + dirName);
+              }
+            } catch (NumberFormatException e) { /*
+                                                 * Not an Hour or Minutes
+                                                 * directory- Do nothing
+                                                 */
+            }
+          }
+        }
+
+        // delete rolling tag
+        FileUtil.fullyDelete(fs, new Path(mergeArgs[3]));
+        // delete M/R temp directory
+        FileUtil.fullyDelete(fs, new Path(mergeArgs[1]));
+      } else {
+        throw new RuntimeException("Error in M/R merge operation!");
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException("Error in M/R merge operation!", e);
+    }
+  }
+
+  void writeRecordFile(String input, String outputDir, String fileName)
+      throws IOException {
+    boolean done = false;
+    int count = 1;
+    Path recordFile = new Path(input);
+    do {
+      Path destDirPath = new Path(outputDir);
+      Path destFilePath = new Path(outputDir + "/" + fileName + "." + count
+          + ".evt");
+
+      if (!fs.exists(destDirPath)) {
+        fs.mkdirs(destDirPath);
+        log.info(">>>>>>>>>>>> create Dir" + destDirPath);
+      }
+
+      if (!fs.exists(destFilePath)) {
+        boolean res = fs.rename(recordFile, destFilePath);
+
+        if (res == false) {
+          log.info(">>>>>>>>>>>> Use standard copy rename failded");
+          FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf);
+        }
+        done = true;
+      } else {
+        log.info("Start MoveToRepository main()");
+      }
+      count++;
+      // Just put a limit here
+      // TODO read from config
+      if (count > 1000) {
+        throw new IOException("too many files in this directory: "
+            + destDirPath);
+      }
+    } while (!done);
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
+
 import org.apache.hadoop.chukwa.extraction.engine.Record;
 
-public interface TaggerPlugin
-{
-	public void tag(String line, Record record);
+public interface TaggerPlugin {
+  public void tag(String line, Record record);
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java Wed Mar 11 22:39:26 2009
@@ -1,20 +1,22 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor;
 
-import java.io.IOException;
 
+import java.io.IOException;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
-public class ChukwaOutputCollector implements OutputCollector<ChukwaRecordKey, ChukwaRecord>
-{
+public class ChukwaOutputCollector implements
+    OutputCollector<ChukwaRecordKey, ChukwaRecord> {
   private OutputCollector<ChukwaRecordKey, ChukwaRecord> outputCollector = null;
   private Reporter reporter = null;
   private String groupName = null;
-  
-  public ChukwaOutputCollector(String groupName,OutputCollector<ChukwaRecordKey, ChukwaRecord> outputCollector,Reporter reporter)
-  {
+
+  public ChukwaOutputCollector(
+                               String groupName,
+                               OutputCollector<ChukwaRecordKey, ChukwaRecord> outputCollector,
+                               Reporter reporter) {
     this.reporter = reporter;
     this.outputCollector = outputCollector;
     this.groupName = groupName;
@@ -22,12 +24,10 @@
 
   @Override
   public void collect(ChukwaRecordKey key, ChukwaRecord value)
-      throws IOException
-  {
+      throws IOException {
     this.outputCollector.collect(key, value);
     reporter.incrCounter(groupName, "total records", 1);
-    reporter.incrCounter(groupName,  key.getReduceType() +" records" , 1);
+    reporter.incrCounter(groupName, key.getReduceType() + " records", 1);
   }
 
-
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java Wed Mar 11 22:39:26 2009
@@ -18,72 +18,58 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor;
 
+
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 
-public class Util
-{
-	static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
-	
-	static Calendar calendar = Calendar.getInstance();
-	static int currentDay = 0;
-	static int currentHour = 0;
-	
-	static
-	{
-		synchronized(calendar)
-		{
-			calendar.setTimeInMillis( System.currentTimeMillis());
-			currentDay = Integer.parseInt(day.format(calendar.getTime()));
-			currentHour = calendar.get(Calendar.HOUR_OF_DAY);
-		}
-	}
-	
-	public static String generateTimeOutput(long timestamp)
-	{
-		int workingDay = 0;
-		int workingHour = 0;
-
-		String output = null;
-
-		int minutes = 0;
-		synchronized(calendar)
-		{
-			calendar.setTimeInMillis( timestamp);
-			workingDay = Integer.parseInt(day.format(calendar.getTime()));
-			workingHour = calendar.get(Calendar.HOUR_OF_DAY);
-			minutes = calendar.get(Calendar.MINUTE);
-		}
-		
-		if (workingDay != currentDay)
-		{
-			output = "_" + workingDay + ".D.evt";
-		}
-		else
-		{
-			if (workingHour != currentHour)
-			{
-				output = "_" +workingDay + "_" +  workingHour + ".H.evt";
-			}
-			else
-			{
-				output = "_" + workingDay + "_" +  workingHour + "_";
-				int dec = minutes/10;
-				output +=  dec ;
-
-				int m = minutes - (dec*10);
-				if (m < 5) 
-				{ 
-					output += "0.R.evt";
-				} 
-				else 
-				{
-					output += "5.R.evt";
-				}
-			}
-		}
+public class Util {
+  static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
 
+  static Calendar calendar = Calendar.getInstance();
+  static int currentDay = 0;
+  static int currentHour = 0;
+
+  static {
+    synchronized (calendar) {
+      calendar.setTimeInMillis(System.currentTimeMillis());
+      currentDay = Integer.parseInt(day.format(calendar.getTime()));
+      currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+    }
+  }
+
+  public static String generateTimeOutput(long timestamp) {
+    int workingDay = 0;
+    int workingHour = 0;
+
+    String output = null;
+
+    int minutes = 0;
+    synchronized (calendar) {
+      calendar.setTimeInMillis(timestamp);
+      workingDay = Integer.parseInt(day.format(calendar.getTime()));
+      workingHour = calendar.get(Calendar.HOUR_OF_DAY);
+      minutes = calendar.get(Calendar.MINUTE);
+    }
+
+    if (workingDay != currentDay) {
+      output = "_" + workingDay + ".D.evt";
+    } else {
+      if (workingHour != currentHour) {
+        output = "_" + workingDay + "_" + workingHour + ".H.evt";
+      } else {
+        output = "_" + workingDay + "_" + workingHour + "_";
+        int dec = minutes / 10;
+        output += dec;
+
+        int m = minutes - (dec * 10);
+        if (m < 5) {
+          output += "0.R.evt";
+        } else {
+          output += "5.R.evt";
+        }
+      }
+    }
 
-		return output;
-	}
+    return output;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-import java.util.Calendar;
 
+import java.util.Calendar;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
@@ -30,10 +30,9 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public abstract class AbstractProcessor implements MapProcessor
-{
+public abstract class AbstractProcessor implements MapProcessor {
   static Logger log = Logger.getLogger(AbstractProcessor.class);
-  
+
   Calendar calendar = Calendar.getInstance();
   byte[] bytes;
   int[] recordOffsets;
@@ -48,24 +47,19 @@
   OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;
   Reporter reporter = null;
 
-  public AbstractProcessor()
-  {
+  public AbstractProcessor() {
   }
 
   protected abstract void parse(String recordEntry,
       OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
       throws Throwable;
 
-  protected void saveChunkInError(Throwable throwable)
-  {
-    if (chunkInErrorSaved == false)
-    {
-      try
-      {
+  protected void saveChunkInError(Throwable throwable) {
+    if (chunkInErrorSaved == false) {
+      try {
         ChunkSaver.saveChunk(chunk, throwable, output, reporter);
         chunkInErrorSaved = true;
-      } catch (Exception e)
-      {
+      } catch (Exception e) {
         e.printStackTrace();
       }
     }
@@ -73,31 +67,26 @@
   }
 
   public void process(ChukwaArchiveKey archiveKey, Chunk chunk,
-      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-  {
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
     chunkInErrorSaved = false;
-    
+
     this.archiveKey = archiveKey;
     this.output = output;
     this.reporter = reporter;
-    
+
     reset(chunk);
-    
-    while (hasNext())
-    {
-      try
-      {
+
+    while (hasNext()) {
+      try {
         parse(nextLine(), output, reporter);
-      } catch (Throwable e)
-      {
+      } catch (Throwable e) {
         saveChunkInError(e);
       }
     }
   }
 
   protected void buildGenericRecord(ChukwaRecord record, String body,
-      long timestamp, String dataSource)
-  {
+      long timestamp, String dataSource) {
     calendar.setTimeInMillis(timestamp);
     calendar.set(Calendar.MINUTE, 0);
     calendar.set(Calendar.SECOND, 0);
@@ -107,8 +96,7 @@
         + timestamp);
     key.setReduceType(dataSource);
 
-    if (body != null)
-    {
+    if (body != null) {
       record.add(Record.bodyField, body);
     }
     record.setTime(timestamp);
@@ -119,8 +107,7 @@
 
   }
 
-  protected void reset(Chunk chunk)
-  {
+  protected void reset(Chunk chunk) {
     this.chunk = chunk;
     this.bytes = chunk.getData();
     this.recordOffsets = chunk.getRecordOffsets();
@@ -128,13 +115,11 @@
     startOffset = 0;
   }
 
-  protected boolean hasNext()
-  {
+  protected boolean hasNext() {
     return (currentPos < recordOffsets.length);
   }
 
-  protected String nextLine()
-  {
+  protected String nextLine() {
     String log = new String(bytes, startOffset, (recordOffsets[currentPos]
         - startOffset + 1));
     startOffset = recordOffsets[currentPos] + 1;

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,14 +18,16 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
-public interface ChunkProcessor
-{
-	public String getDataType();
-	public void process(Chunk chunk,OutputCollector<Text, ChukwaRecord> output, Reporter reporter);
+public interface ChunkProcessor {
+  public String getDataType();
+
+  public void process(Chunk chunk, OutputCollector<Text, ChukwaRecord> output,
+      Reporter reporter);
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-import java.util.Calendar;
 
+import java.util.Calendar;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -30,17 +30,15 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class ChunkSaver
-{
+public class ChunkSaver {
   static Logger log = Logger.getLogger(ChunkSaver.class);
+
   public static ChukwaRecord saveChunk(Chunk chunk, Throwable throwable,
-      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-  {
-    try
-    {
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+    try {
       reporter.incrCounter("DemuxError", "count", 1);
       reporter.incrCounter("DemuxError", chunk.getDataType() + "Count", 1);
- 
+
       ChukwaRecord record = new ChukwaRecord();
       long ts = System.currentTimeMillis();
       Calendar calendar = Calendar.getInstance();
@@ -68,22 +66,15 @@
       output.collect(key, record);
 
       return record;
-    }
-    catch (Throwable e) 
-    {
+    } catch (Throwable e) {
       e.printStackTrace();
-      try
-      {
-        log.warn("Unable to save a chunk: tags: " 
-            + chunk.getTags()   + " - source:"
-            + chunk.getSource() + " - dataType: "
-            + chunk.getDataType() + " - Stream: " 
-            + chunk.getStreamName() + " - SeqId: "
-            + chunk.getSeqID() + " - Data: " 
-            + new String(chunk.getData()) );
-      }
-      catch (Throwable e1) 
-      {
+      try {
+        log.warn("Unable to save a chunk: tags: " + chunk.getTags()
+            + " - source:" + chunk.getSource() + " - dataType: "
+            + chunk.getDataType() + " - Stream: " + chunk.getStreamName()
+            + " - SeqId: " + chunk.getSeqID() + " - Data: "
+            + new String(chunk.getData()));
+      } catch (Throwable e1) {
         e.printStackTrace();
       }
     }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java Wed Mar 11 22:39:26 2009
@@ -18,31 +18,27 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-public class DFInvalidRecord extends Exception
-{
 
-	/**
+public class DFInvalidRecord extends Exception {
+
+  /**
 	 * 
 	 */
-	private static final long serialVersionUID = 1254238125122522523L;
+  private static final long serialVersionUID = 1254238125122522523L;
+
+  public DFInvalidRecord() {
+  }
 
-	public DFInvalidRecord()
-	{
-	}
-
-	public DFInvalidRecord(String arg0)
-	{
-		super(arg0);
-	}
-
-	public DFInvalidRecord(Throwable arg0)
-	{
-		super(arg0);
-	}
-
-	public DFInvalidRecord(String arg0, Throwable arg1)
-	{
-		super(arg0, arg1);
-	}
+  public DFInvalidRecord(String arg0) {
+    super(arg0);
+  }
+
+  public DFInvalidRecord(Throwable arg0) {
+    super(arg0);
+  }
+
+  public DFInvalidRecord(String arg0, Throwable arg1) {
+    super(arg0, arg1);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,39 +18,34 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-import java.io.IOException;
 
+import java.io.IOException;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class DebugOutputProcessor extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(DebugOutputProcessor.class);
-	public static final String recordType = "Debug";
-		
-	@Override
-	public void parse(String line, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	{
-		log.info("record: [" + line + "] type[" + chunk.getDataType() + "]");
-		
-		ChukwaRecord record = new ChukwaRecord();
-		buildGenericRecord(record,line, System.currentTimeMillis(),recordType);
-		key.setKey("" + chunk.getSeqID());
-		try
-		{
-			output.collect(key, record);
-		} catch (IOException e)
-		{
-			e.printStackTrace();
-		}
-	}
-
-	public String getDataType()
-	{
-		return DebugOutputProcessor.recordType;
-	}
+public class DebugOutputProcessor extends AbstractProcessor {
+  static Logger log = Logger.getLogger(DebugOutputProcessor.class);
+  public static final String recordType = "Debug";
+
+  @Override
+  public void parse(String line,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+    log.info("record: [" + line + "] type[" + chunk.getDataType() + "]");
+
+    ChukwaRecord record = new ChukwaRecord();
+    buildGenericRecord(record, line, System.currentTimeMillis(), recordType);
+    key.setKey("" + chunk.getSeqID());
+    try {
+      output.collect(key, record);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public String getDataType() {
+    return DebugOutputProcessor.recordType;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java Wed Mar 11 22:39:26 2009
@@ -1,32 +1,28 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-import java.io.IOException;
 
+import java.io.IOException;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class DefaultProcessor extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(DefaultProcessor.class);
-	@Override
-	protected void parse(String recordEntry,
-			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	{
-		try
-		{
-			ChukwaRecord record = new ChukwaRecord();
-			this.buildGenericRecord(record, recordEntry, archiveKey.getTimePartition(), chunk.getDataType());
-			output.collect(key, record);
-		} 
-		catch (IOException e)
-		{
-			log.warn("Unable to collect output in DefaultProcessor ["
-					+ recordEntry + "]", e);
-			e.printStackTrace();
-		}
-	}
+public class DefaultProcessor extends AbstractProcessor {
+  static Logger log = Logger.getLogger(DefaultProcessor.class);
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+    try {
+      ChukwaRecord record = new ChukwaRecord();
+      this.buildGenericRecord(record, recordEntry, archiveKey
+          .getTimePartition(), chunk.getDataType());
+      output.collect(key, record);
+    } catch (IOException e) {
+      log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
+          + "]", e);
+      e.printStackTrace();
+    }
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java Wed Mar 11 22:39:26 2009
@@ -18,107 +18,97 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class Df extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(Df.class);
-	private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
-			"Used", "Available", "Use%", "Mounted", "on" };
-	private static final String[] headerCols = { "Filesystem", "1K-blocks",
-    "Used", "Available", "Use%", "Mounted on" };
-	private SimpleDateFormat sdf = null;
-
-	public Df()
-	{
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-	}
-
-	@Override
-	protected void parse(String recordEntry,
-			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	throws Throwable
-	{
-	  
-		try
-		{
-			String dStr = recordEntry.substring(0, 23);
-			int start = 24;
-			int idx = recordEntry.indexOf(' ', start);
-			// String level = recordEntry.substring(start, idx);
-			start = idx + 1;
-			idx = recordEntry.indexOf(' ', start);
-			// String className = recordEntry.substring(start, idx-1);
-			String body = recordEntry.substring(idx + 1);
-
-			Date d = sdf.parse(dStr);
-			String[] lines = body.split("\n");
-
-			String[] outputCols = lines[0].split("[\\s]++");
-			
-			if (outputCols.length != headerSplitCols.length
-					|| outputCols[0].intern() != headerSplitCols[0].intern()
-					|| outputCols[1].intern() != headerSplitCols[1].intern()
-					|| outputCols[2].intern() != headerSplitCols[2].intern()
-					|| outputCols[3].intern() != headerSplitCols[3].intern()
-					|| outputCols[4].intern() != headerSplitCols[4].intern()
-					|| outputCols[5].intern() != headerSplitCols[5].intern()
-			    || outputCols[6].intern() != headerSplitCols[6].intern()
-			    )
-			{
-			  throw new DFInvalidRecord("Wrong output format (header) ["
-						+ recordEntry + "]");
-			}
-
-			String[] values = null;
-
-			// Data
-			ChukwaRecord record = null;
-
-			for (int i = 1; i < lines.length; i++)
-			{
-				values = lines[i].split("[\\s]++");
-				key = new ChukwaRecordKey();
-				record = new ChukwaRecord();
-				this.buildGenericRecord(record, null, d.getTime(), "Df");
-
-				record.add(headerCols[0], values[0]);
-				record.add(headerCols[1], values[1]);
-				record.add(headerCols[2], values[2]);
-				record.add(headerCols[3], values[3]);
-				record.add(headerCols[4], values[4].substring(0, values[4].length()-1)); // Remove %
-				record.add(headerCols[5], values[5]);
-
-				output.collect(key, record);
-			}
-
-			//log.info("DFProcessor output 1 DF record");
-		} catch (ParseException e)
-		{
-			e.printStackTrace();
-			log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
-			throw e;
-		} catch (IOException e)
-		{
-			e.printStackTrace();
-			log.warn("Unable to collect output in DFProcessor [" + recordEntry
-					+ "]", e);
-			throw e;
-		} catch (DFInvalidRecord e)
-		{
-			e.printStackTrace();
-			log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
-			throw e;
-		}
-	}
+public class Df extends AbstractProcessor {
+  static Logger log = Logger.getLogger(Df.class);
+  private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
+      "Used", "Available", "Use%", "Mounted", "on" };
+  private static final String[] headerCols = { "Filesystem", "1K-blocks",
+      "Used", "Available", "Use%", "Mounted on" };
+  private SimpleDateFormat sdf = null;
+
+  public Df() {
+    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+  }
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+
+    try {
+      String dStr = recordEntry.substring(0, 23);
+      int start = 24;
+      int idx = recordEntry.indexOf(' ', start);
+      // String level = recordEntry.substring(start, idx);
+      start = idx + 1;
+      idx = recordEntry.indexOf(' ', start);
+      // String className = recordEntry.substring(start, idx-1);
+      String body = recordEntry.substring(idx + 1);
+
+      Date d = sdf.parse(dStr);
+      String[] lines = body.split("\n");
+
+      String[] outputCols = lines[0].split("[\\s]++");
+
+      if (outputCols.length != headerSplitCols.length
+          || outputCols[0].intern() != headerSplitCols[0].intern()
+          || outputCols[1].intern() != headerSplitCols[1].intern()
+          || outputCols[2].intern() != headerSplitCols[2].intern()
+          || outputCols[3].intern() != headerSplitCols[3].intern()
+          || outputCols[4].intern() != headerSplitCols[4].intern()
+          || outputCols[5].intern() != headerSplitCols[5].intern()
+          || outputCols[6].intern() != headerSplitCols[6].intern()) {
+        throw new DFInvalidRecord("Wrong output format (header) ["
+            + recordEntry + "]");
+      }
+
+      String[] values = null;
+
+      // Data
+      ChukwaRecord record = null;
+
+      for (int i = 1; i < lines.length; i++) {
+        values = lines[i].split("[\\s]++");
+        key = new ChukwaRecordKey();
+        record = new ChukwaRecord();
+        this.buildGenericRecord(record, null, d.getTime(), "Df");
+
+        record.add(headerCols[0], values[0]);
+        record.add(headerCols[1], values[1]);
+        record.add(headerCols[2], values[2]);
+        record.add(headerCols[3], values[3]);
+        record.add(headerCols[4], values[4]
+            .substring(0, values[4].length() - 1)); // Remove %
+        record.add(headerCols[5], values[5]);
+
+        output.collect(key, record);
+      }
+
+      // log.info("DFProcessor output 1 DF record");
+    } catch (ParseException e) {
+      e.printStackTrace();
+      log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
+      throw e;
+    } catch (IOException e) {
+      e.printStackTrace();
+      log.warn("Unable to collect output in DFProcessor [" + recordEntry + "]",
+          e);
+      throw e;
+    } catch (DFInvalidRecord e) {
+      e.printStackTrace();
+      log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
+      throw e;
+    }
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,27 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
-public class DuplicateProcessorException extends RuntimeException
-{
 
-	/**
+public class DuplicateProcessorException extends RuntimeException {
+
+  /**
 	 * 
 	 */
-	private static final long serialVersionUID = 3890267797961057789L;
+  private static final long serialVersionUID = 3890267797961057789L;
 
-	public DuplicateProcessorException()
-	{}
+  public DuplicateProcessorException() {
+  }
 
-	public DuplicateProcessorException(String message)
-	{
-		super(message);
-	}
-
-	public DuplicateProcessorException(Throwable cause)
-	{
-		super(cause);
-	}
-
-	public DuplicateProcessorException(String message, Throwable cause)
-	{
-		super(message, cause);
-	}
+  public DuplicateProcessorException(String message) {
+    super(message);
+  }
+
+  public DuplicateProcessorException(Throwable cause) {
+    super(cause);
+  }
+
+  public DuplicateProcessorException(String message, Throwable cause) {
+    super(message, cause);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,79 +18,66 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
+public class HadoopLogProcessor extends AbstractProcessor {
+  static Logger log = Logger.getLogger(HadoopLogProcessor.class);
 
-
-public class HadoopLogProcessor extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(HadoopLogProcessor.class);
-	
-	private static final String recordType = "HadoopLog";
-	private static final String nameNodeType = "NameNode";
-	private static final String dataNodeType = "DataNode";
-	private static final String auditType = "Audit";
-	private SimpleDateFormat sdf = null;
-	
-	
-	public HadoopLogProcessor()
-	{
-		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
-	}
-	
-	@Override
-	public void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-	 throws Throwable
-	{
-			try
-			{
-				String dStr = recordEntry.substring(0, 23);
-				Date d = sdf.parse(dStr);
-				ChukwaRecord record = new ChukwaRecord();
-
-				if (this.chunk.getStreamName().indexOf("datanode") > 0) {
-					buildGenericRecord(record,recordEntry,d.getTime(),dataNodeType);
-				} else if (this.chunk.getStreamName().indexOf("namenode") > 0) {
-					buildGenericRecord(record,recordEntry,d.getTime(),nameNodeType);
-				} else if (this.chunk.getStreamName().indexOf("audit") > 0) {
-					buildGenericRecord(record,recordEntry,d.getTime(),auditType);
-				} else {
-					buildGenericRecord(record,recordEntry,d.getTime(),recordType);
-				}
-				
-				
-				output.collect(key, record);
-			}
-			catch (ParseException e)
-			{
-				log.warn("Unable to parse the date in DefaultProcessor ["
-						+ recordEntry + "]", e);
-				e.printStackTrace();
-				throw e;
-			}
-			catch (IOException e)
-			{
-				log.warn("Unable to collect output in DefaultProcessor ["
-						+ recordEntry + "]", e);
-				e.printStackTrace();
-				throw e;
-			}		
-	}
-
-	public String getDataType()
-	{
-		return HadoopLogProcessor.recordType;
-	}
+  private static final String recordType = "HadoopLog";
+  private static final String nameNodeType = "NameNode";
+  private static final String dataNodeType = "DataNode";
+  private static final String auditType = "Audit";
+  private SimpleDateFormat sdf = null;
+
+  public HadoopLogProcessor() {
+    // TODO move that to config
+    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+  }
+
+  @Override
+  public void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+    try {
+      String dStr = recordEntry.substring(0, 23);
+      Date d = sdf.parse(dStr);
+      ChukwaRecord record = new ChukwaRecord();
+
+      if (this.chunk.getStreamName().indexOf("datanode") > 0) {
+        buildGenericRecord(record, recordEntry, d.getTime(), dataNodeType);
+      } else if (this.chunk.getStreamName().indexOf("namenode") > 0) {
+        buildGenericRecord(record, recordEntry, d.getTime(), nameNodeType);
+      } else if (this.chunk.getStreamName().indexOf("audit") > 0) {
+        buildGenericRecord(record, recordEntry, d.getTime(), auditType);
+      } else {
+        buildGenericRecord(record, recordEntry, d.getTime(), recordType);
+      }
+
+      output.collect(key, record);
+    } catch (ParseException e) {
+      log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
+          + "]", e);
+      e.printStackTrace();
+      throw e;
+    } catch (IOException e) {
+      log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
+          + "]", e);
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  public String getDataType() {
+    return HadoopLogProcessor.recordType;
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.Iterator;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -35,86 +35,86 @@
 
 public class HadoopMetricsProcessor extends AbstractProcessor {
 
-	static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
-	static final String chukwaTimestampField = "chukwa_timestamp";
-	static final String contextNameField = "contextName";
-	static final String recordNameField = "recordName";
-
-	private SimpleDateFormat sdf = null;
-
-	public HadoopMetricsProcessor() {
-		// TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	protected void parse(String recordEntry,
-			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter) throws Throwable {
-		try {
-			String dStr = recordEntry.substring(0, 23);
-			int start = 24;
-			int idx = recordEntry.indexOf(' ', start);
-			// String level = recordEntry.substring(start, idx);
-			start = idx + 1;
-			idx = recordEntry.indexOf(' ', start);
-			// String className = recordEntry.substring(start, idx-1);
-			String body = recordEntry.substring(idx + 1);
-			body.replaceAll("\n", "");
-			// log.info("record [" + recordEntry + "] body [" + body +"]");
-			Date d = sdf.parse(dStr);
-
-			JSONObject json = new JSONObject(body);
-
-			ChukwaRecord record = new ChukwaRecord();
-			String datasource = null;
-			String recordName = null;
-
-			Iterator<String> ki = json.keys();
-			while (ki.hasNext()) {
-				String keyName = ki.next();
-				if (chukwaTimestampField.intern() == keyName.intern()) {
-					d = new Date(json.getLong(keyName));
-					Calendar cal = Calendar.getInstance();
-					cal.setTimeInMillis(d.getTime());
-					cal.set(Calendar.SECOND, 0);
-					cal.set(Calendar.MILLISECOND, 0);
-					d.setTime(cal.getTimeInMillis());
-				} else if (contextNameField.intern() == keyName.intern()) {
-					datasource = "Hadoop_" + json.getString(keyName);
-				} else if (recordNameField.intern() == keyName.intern()) {
-					recordName = json.getString(keyName);
-					record.add(keyName, json.getString(keyName));
-				} else {
-					record.add(keyName, json.getString(keyName));
-				}
-			}
-
-			datasource = datasource + "_" + recordName;
-			buildGenericRecord(record, null, d.getTime(), datasource);
-			output.collect(key, record);
-		} catch (ParseException e) {
-			e.printStackTrace();
-			log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry
-					+ "]", e);
-			throw e;
-		} catch (IOException e) {
-			e.printStackTrace();
-			log.warn("Unable to collect output in HadoopMetricsProcessor ["
-					+ recordEntry + "]", e);
-			throw e;
-		} catch (JSONException e) {
-			e.printStackTrace();
-			log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry
-					+ "]", e);
-			throw e;
-		}
-
-	}
-
-	public String getDataType() {
-		return HadoopMetricsProcessor.class.getName();
-	}
+  static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
+  static final String chukwaTimestampField = "chukwa_timestamp";
+  static final String contextNameField = "contextName";
+  static final String recordNameField = "recordName";
+
+  private SimpleDateFormat sdf = null;
+
+  public HadoopMetricsProcessor() {
+    // TODO move that to config
+    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+    try {
+      String dStr = recordEntry.substring(0, 23);
+      int start = 24;
+      int idx = recordEntry.indexOf(' ', start);
+      // String level = recordEntry.substring(start, idx);
+      start = idx + 1;
+      idx = recordEntry.indexOf(' ', start);
+      // String className = recordEntry.substring(start, idx-1);
+      String body = recordEntry.substring(idx + 1);
+      body.replaceAll("\n", "");
+      // log.info("record [" + recordEntry + "] body [" + body +"]");
+      Date d = sdf.parse(dStr);
+
+      JSONObject json = new JSONObject(body);
+
+      ChukwaRecord record = new ChukwaRecord();
+      String datasource = null;
+      String recordName = null;
+
+      Iterator<String> ki = json.keys();
+      while (ki.hasNext()) {
+        String keyName = ki.next();
+        if (chukwaTimestampField.intern() == keyName.intern()) {
+          d = new Date(json.getLong(keyName));
+          Calendar cal = Calendar.getInstance();
+          cal.setTimeInMillis(d.getTime());
+          cal.set(Calendar.SECOND, 0);
+          cal.set(Calendar.MILLISECOND, 0);
+          d.setTime(cal.getTimeInMillis());
+        } else if (contextNameField.intern() == keyName.intern()) {
+          datasource = "Hadoop_" + json.getString(keyName);
+        } else if (recordNameField.intern() == keyName.intern()) {
+          recordName = json.getString(keyName);
+          record.add(keyName, json.getString(keyName));
+        } else {
+          record.add(keyName, json.getString(keyName));
+        }
+      }
+
+      datasource = datasource + "_" + recordName;
+      buildGenericRecord(record, null, d.getTime(), datasource);
+      output.collect(key, record);
+    } catch (ParseException e) {
+      e.printStackTrace();
+      log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
+          e);
+      throw e;
+    } catch (IOException e) {
+      e.printStackTrace();
+      log.warn("Unable to collect output in HadoopMetricsProcessor ["
+          + recordEntry + "]", e);
+      throw e;
+    } catch (JSONException e) {
+      e.printStackTrace();
+      log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
+          e);
+      throw e;
+    }
+
+  }
+
+  public String getDataType() {
+    return HadoopMetricsProcessor.class.getName();
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java Wed Mar 11 22:39:26 2009
@@ -18,124 +18,118 @@
 
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
-public class Iostat extends AbstractProcessor
-{
-	static Logger log = Logger.getLogger(Iostat.class);
-	public final String recordType = this.getClass().getName();
-	
-	private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
-	private static Pattern p = null;
-	
-	private Matcher matcher = null;
-	private SimpleDateFormat sdf = null;
-
-	public Iostat()
-	{
-		//TODO move that to config
-		sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
-		p = Pattern.compile(regex);
-	}
-
-	@Override
-	protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter)
-  throws Throwable
-	{
-		
-		log.debug("Iostat record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
-		int i = 0;
-
-		matcher=p.matcher(recordEntry);
-		while (matcher.find())
-		{
-			log.debug("Iostat Processor Matches");
-			
-			try
-			{
-				Date d = sdf.parse( matcher.group(1).trim());
-				
-
-				
-				String[] lines = recordEntry.split("\n");
-				String[] headers = null;
-				for(int skip=0;skip<2;skip++) {
-				    i++;
-					while ( i < lines.length && lines[i].indexOf("avg-cpu")<0) {
-					    // Skip the first output because the numbers are averaged from system boot up
-					    log.debug("skip line:"+lines[i]);
-					    i++;					
-				    }
-				}
-				while (i < lines.length)
-				{
-					ChukwaRecord record = null;
-					
-					if(lines[i].indexOf("avg-cpu")>=0 || lines[i].indexOf("Device")>=0) {
-						headers = parseHeader(lines[i]);
-						i++;
-					}
-					String data[] = parseData(lines[i]);
-					if(headers[0].equals("avg-cpu:")) {
-						log.debug("Matched CPU-Utilization");
-						record = new ChukwaRecord();
-					  key = new ChukwaRecordKey();
-					  buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
-					} else if(headers[0].equals("Device:")) {
-						log.debug("Matched Iostat");
-						record = new ChukwaRecord();
-						key = new ChukwaRecordKey();
-					  buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
-					} else {
-						log.debug("No match:"+headers[0]);
-					}
-					if(record!=null) {
-						int j=0;
-						log.debug("Data Length: " + data.length);
-			            while(j<data.length) {
-			            	log.debug("header:"+headers[j]+" data:"+data[j]);
-			            	if(!headers[j].equals("avg-cpu:")) {
-			            		record.add(headers[j],data[j]);
-			            	}
-						  j++;
-			            }
-			            record.setTime(d.getTime());
-			            if(data.length>3) {
-						    output.collect(key, record);
-			            }
-					}
-					i++;
-				}
-				// End of parsing
-			} catch (Exception e)
-			{
-				e.printStackTrace();
-				throw e;
-			}
-		}
-	}
-	
-	public String[] parseHeader(String header) {
-		String[] headers = header.split("\\s+");
-		return headers;
-	}
-
-	public String[] parseData(String dataLine) {
-		String[] data = dataLine.split("\\s+");
-		return data;
-	}
-
-	public String getDataType() {
-		return recordType;
-	}
+public class Iostat extends AbstractProcessor {
+  static Logger log = Logger.getLogger(Iostat.class);
+  public final String recordType = this.getClass().getName();
+
+  private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
+  private static Pattern p = null;
+
+  private Matcher matcher = null;
+  private SimpleDateFormat sdf = null;
+
+  public Iostat() {
+    // TODO move that to config
+    sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+    p = Pattern.compile(regex);
+  }
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+
+    log.debug("Iostat record: [" + recordEntry + "] type["
+        + chunk.getDataType() + "]");
+    int i = 0;
+
+    matcher = p.matcher(recordEntry);
+    while (matcher.find()) {
+      log.debug("Iostat Processor Matches");
+
+      try {
+        Date d = sdf.parse(matcher.group(1).trim());
+
+        String[] lines = recordEntry.split("\n");
+        String[] headers = null;
+        for (int skip = 0; skip < 2; skip++) {
+          i++;
+          while (i < lines.length && lines[i].indexOf("avg-cpu") < 0) {
+            // Skip the first output because the numbers are averaged from
+            // system boot up
+            log.debug("skip line:" + lines[i]);
+            i++;
+          }
+        }
+        while (i < lines.length) {
+          ChukwaRecord record = null;
+
+          if (lines[i].indexOf("avg-cpu") >= 0
+              || lines[i].indexOf("Device") >= 0) {
+            headers = parseHeader(lines[i]);
+            i++;
+          }
+          String data[] = parseData(lines[i]);
+          if (headers[0].equals("avg-cpu:")) {
+            log.debug("Matched CPU-Utilization");
+            record = new ChukwaRecord();
+            key = new ChukwaRecordKey();
+            buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+          } else if (headers[0].equals("Device:")) {
+            log.debug("Matched Iostat");
+            record = new ChukwaRecord();
+            key = new ChukwaRecordKey();
+            buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+          } else {
+            log.debug("No match:" + headers[0]);
+          }
+          if (record != null) {
+            int j = 0;
+            log.debug("Data Length: " + data.length);
+            while (j < data.length) {
+              log.debug("header:" + headers[j] + " data:" + data[j]);
+              if (!headers[j].equals("avg-cpu:")) {
+                record.add(headers[j], data[j]);
+              }
+              j++;
+            }
+            record.setTime(d.getTime());
+            if (data.length > 3) {
+              output.collect(key, record);
+            }
+          }
+          i++;
+        }
+        // End of parsing
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw e;
+      }
+    }
+  }
+
+  public String[] parseHeader(String header) {
+    String[] headers = header.split("\\s+");
+    return headers;
+  }
+
+  public String[] parseData(String dataLine) {
+    String[] data = dataLine.split("\\s+");
+    return data;
+  }
+
+  public String getDataType() {
+    return recordType;
+  }
 }
\ No newline at end of file

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java Wed Mar 11 22:39:26 2009
@@ -1,15 +1,14 @@
 package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
 
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.util.Calendar;
 import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
-
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -22,99 +21,101 @@
 import org.w3c.dom.Text;
 
 public class JobConfProcessor extends AbstractProcessor {
-	static Logger log = Logger.getLogger(JobConfProcessor.class);
-	static  Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
-    static  Pattern hodPattern = Pattern.compile("(.*?)/(.*?)\\.(\\d+)\\.(.*?)\\.hodring/(.*?)");
-    static  Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
-	@Override
-	protected void parse(String recordEntry,
-			OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-			Reporter reporter) 
-	 throws Throwable
-	{
-		Long time = 0L;
-		Random randomNumber = new Random();
-		String tags = this.chunk.getTags();
-
-		Matcher matcher = timePattern.matcher(tags);
-		if (matcher.matches()) {
-			time = Long.parseLong(matcher.group(2));
-		}
-		String capp = this.chunk.getApplication();
-    	String hodID = "";
-    	String jobID = "";
-        matcher = hodPattern.matcher(capp);
-        if(matcher.matches()) {
-        	hodID=matcher.group(3);
+  static Logger log = Logger.getLogger(JobConfProcessor.class);
+  static Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
+  static Pattern hodPattern = Pattern
+      .compile("(.*?)/(.*?)\\.(\\d+)\\.(.*?)\\.hodring/(.*?)");
+  static Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
+
+  @Override
+  protected void parse(String recordEntry,
+      OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+      throws Throwable {
+    Long time = 0L;
+    Random randomNumber = new Random();
+    String tags = this.chunk.getTags();
+
+    Matcher matcher = timePattern.matcher(tags);
+    if (matcher.matches()) {
+      time = Long.parseLong(matcher.group(2));
+    }
+    String capp = this.chunk.getApplication();
+    String hodID = "";
+    String jobID = "";
+    matcher = hodPattern.matcher(capp);
+    if (matcher.matches()) {
+      hodID = matcher.group(3);
+    }
+    matcher = jobPattern.matcher(capp);
+    if (matcher.matches()) {
+      jobID = matcher.group(2);
+    }
+    ChukwaRecord record = new ChukwaRecord();
+    DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
+        .newInstance();
+    // ignore all comments inside the xml file
+    docBuilderFactory.setIgnoringComments(true);
+    try {
+      DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+      Document doc = null;
+      String fileName = "test_" + randomNumber.nextInt();
+      File tmp = new File(fileName);
+      FileOutputStream out = new FileOutputStream(tmp);
+      out.write(recordEntry.getBytes());
+      out.close();
+      doc = builder.parse(fileName);
+      Element root = doc.getDocumentElement();
+      if (!"configuration".equals(root.getTagName()))
+        log.fatal("bad conf file: top-level element not <configuration>");
+      NodeList props = root.getChildNodes();
+
+      for (int i = 0; i < props.getLength(); i++) {
+        Node propNode = props.item(i);
+        if (!(propNode instanceof Element))
+          continue;
+        Element prop = (Element) propNode;
+        if (!"property".equals(prop.getTagName()))
+          log.warn("bad conf file: element not <property>");
+        NodeList fields = prop.getChildNodes();
+        String attr = null;
+        String value = null;
+        boolean finalParameter = false;
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element))
+            continue;
+          Element field = (Element) fieldNode;
+          if ("name".equals(field.getTagName()) && field.hasChildNodes())
+            attr = ((Text) field.getFirstChild()).getData().trim();
+          if ("value".equals(field.getTagName()) && field.hasChildNodes())
+            value = ((Text) field.getFirstChild()).getData();
+          if ("final".equals(field.getTagName()) && field.hasChildNodes())
+            finalParameter = "true".equals(((Text) field.getFirstChild())
+                .getData());
         }
-        matcher = jobPattern.matcher(capp);
-        if(matcher.matches()) {
-        	jobID=matcher.group(2);
+
+        // Ignore this parameter if it has already been marked as 'final'
+        if (attr != null && value != null) {
+          record.add(attr, value);
         }
-		ChukwaRecord record = new ChukwaRecord();
-	    DocumentBuilderFactory docBuilderFactory 
-		    = DocumentBuilderFactory.newInstance();
-	    //ignore all comments inside the xml file
-	    docBuilderFactory.setIgnoringComments(true);
-	    try {
-	        DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
- 	        Document doc = null;
- 	        String fileName = "test_"+randomNumber.nextInt();
- 	        File tmp = new File(fileName);
- 	        FileOutputStream out = new FileOutputStream(tmp);
- 	        out.write(recordEntry.getBytes());
- 	        out.close();
-		    doc = builder.parse(fileName);
-		    Element root = doc.getDocumentElement();
-		    if (!"configuration".equals(root.getTagName()))
-		        log.fatal("bad conf file: top-level element not <configuration>");
-		    NodeList props = root.getChildNodes();
-		
-		    for (int i = 0; i < props.getLength(); i++) {
-		        Node propNode = props.item(i);
-		        if (!(propNode instanceof Element))
-		            continue;
-		        Element prop = (Element)propNode;
-		        if (!"property".equals(prop.getTagName()))
-		            log.warn("bad conf file: element not <property>");
-		        NodeList fields = prop.getChildNodes();
-		        String attr = null;
-		        String value = null;
-		        boolean finalParameter = false;
-		        for (int j = 0; j < fields.getLength(); j++) {
-		            Node fieldNode = fields.item(j);
-		            if (!(fieldNode instanceof Element))
-		                continue;
-		            Element field = (Element)fieldNode;
-		            if ("name".equals(field.getTagName()) && field.hasChildNodes())
-		                attr = ((Text)field.getFirstChild()).getData().trim();
-		            if ("value".equals(field.getTagName()) && field.hasChildNodes())
-		                value = ((Text)field.getFirstChild()).getData();
-		            if ("final".equals(field.getTagName()) && field.hasChildNodes())
-		                finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
-		        }
-		        
-		        // Ignore this parameter if it has already been marked as 'final'
-		        if (attr != null && value != null) {
-		            record.add(attr, value);
-		        }
-		    }
-		    buildGenericRecord(record,null, time,"JobConf");
-			calendar.setTimeInMillis(time);
-			calendar.set(Calendar.MINUTE, 0);
-			calendar.set(Calendar.SECOND, 0);
-			calendar.set(Calendar.MILLISECOND, 0);			
-			key.setKey(""+ calendar.getTimeInMillis() + "/" + hodID +"." + jobID + "/" + time);
-				        
-            output.collect(key,record);
-            tmp.delete();
-	    } catch(Exception e) {
-	        e.printStackTrace();	
-	        throw e;
-	    }
-	}
-	
-	public String getDataType() {
-		return Torque.class.getName();
-	}
+      }
+      buildGenericRecord(record, null, time, "JobConf");
+      calendar.setTimeInMillis(time);
+      calendar.set(Calendar.MINUTE, 0);
+      calendar.set(Calendar.SECOND, 0);
+      calendar.set(Calendar.MILLISECOND, 0);
+      key.setKey("" + calendar.getTimeInMillis() + "/" + hodID + "." + jobID
+          + "/" + time);
+
+      output.collect(key, record);
+      tmp.delete();
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  public String getDataType() {
+    return Torque.class.getName();
+  }
 }