You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [7/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoop...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveToRepository.java Wed Mar 11 22:39:26 2009
@@ -18,11 +18,11 @@
package org.apache.hadoop.chukwa.extraction.demux;
+
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Calendar;
-
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -34,251 +34,237 @@
// First version of the Spill
// need some polishing
-public class MoveToRepository
-{
- static Logger log = Logger.getLogger(MoveToRepository.class);
-
- static ChukwaConfiguration conf = null;
- static FileSystem fs = null;
- static final String HadoopLogDir = "_logs";
- static final String hadoopTempDir = "_temporary";
- static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
- static Calendar calendar = Calendar.getInstance();
-
- static void processClutserDirectory(Path srcDir,String destDir) throws Exception
- {
- log.info("processClutserDirectory (" + srcDir.getName() + "," + destDir +")");
- FileStatus fstat = fs.getFileStatus(srcDir);
-
- if (!fstat.isDir())
- {
- throw new IOException(srcDir + " is not a directory!");
- }
- else
- {
- FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
-
- for(FileStatus datasourceDirectory : datasourceDirectories)
- {
- log.info(datasourceDirectory.getPath() + " isDir?" +datasourceDirectory.isDir());
- if (!datasourceDirectory.isDir())
- {
- throw new IOException("Top level datasource directory should be a directory :" + datasourceDirectory.getPath());
- }
-
- String dirName = datasourceDirectory.getPath().getName();
- Path destPath = new Path(destDir + "/" + dirName);
- log.info("dest directory path: " + destPath);
- log.info("processClutserDirectory processing Datasource: (" + dirName +")");
- processDatasourceDirectory(srcDir.getName(),datasourceDirectory.getPath(),destDir + "/" + dirName);
- }
- }
- }
-
- static void processDatasourceDirectory(String cluster,Path srcDir,String destDir) throws Exception
- {
- String fileName = null;
- int fileDay = 0;
- int fileHour = 0;
- int fileMin = 0;
-
- FileStatus[] recordFiles = fs.listStatus(srcDir);
- for(FileStatus recordFile : recordFiles)
- {
- // dataSource_20080915_18_15.1.evt
- // <datasource>_<yyyyMMdd_HH_mm>.1.evt
-
- fileName = recordFile.getPath().getName();
- log.info("processDatasourceDirectory processing RecordFile: (" + fileName +")");
- log.info("fileName: " + fileName);
-
-
- int l = fileName.length();
- String dataSource = srcDir.getName();
- log.info("Datasource: " + dataSource);
-
- if (fileName.endsWith(".D.evt"))
- {
- // Hadoop_dfs_datanode_20080919.D.evt
-
- fileDay = Integer.parseInt(fileName.substring(l-14,l-6));
- writeRecordFile(destDir + "/" + fileDay + "/", recordFile.getPath(),dataSource + "_" +fileDay);
- // mark this directory for Daily rotate (re-process)
- addDirectory4Rolling( true,fileDay , fileHour, cluster , dataSource);
- }
- else if (fileName.endsWith(".H.evt"))
- {
- // Hadoop_dfs_datanode_20080925_1.H.evt
- // Hadoop_dfs_datanode_20080925_12.H.evt
-
- String day = null;
- String hour = null;
- if (fileName.charAt(l-8) == '_')
- {
- day = fileName.substring(l-16,l-8);
- log.info("day->" + day);
- hour = "" +fileName.charAt(l-7);
- log.info("hour->" +hour);
- }
- else
- {
- day = fileName.substring(l-17,l-9);
- log.info("day->" +day);
- hour = fileName.substring(l-8,l-6);
- log.info("hour->" +hour);
- }
- fileDay = Integer.parseInt(day);
- fileHour = Integer.parseInt(hour);
- // rotate there so spill
- writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/", recordFile.getPath(),dataSource + "_" +fileDay+ "_" + fileHour );
- // mark this directory for daily rotate
- addDirectory4Rolling( true,fileDay , fileHour, cluster , dataSource);
- }
- else if (fileName.endsWith(".R.evt"))
- {
- if (fileName.charAt(l-11) == '_')
- {
- fileDay = Integer.parseInt(fileName.substring(l-19,l-11));
- fileHour = Integer.parseInt(""+fileName.charAt(l-10));
- fileMin = Integer.parseInt(fileName.substring(l-8,l-6));
- }
- else
- {
- fileDay = Integer.parseInt(fileName.substring(l-20,l-12));
- fileHour = Integer.parseInt(fileName.substring(l-11,l-9));
- fileMin = Integer.parseInt(fileName.substring(l-8,l-6));
- }
-
- log.info("fileDay: " + fileDay);
- log.info("fileHour: " + fileHour);
- log.info("fileMin: " + fileMin);
- writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/" + fileMin, recordFile.getPath(),dataSource + "_" +fileDay+ "_" + fileHour +"_" +fileMin);
- // mark this directory for hourly rotate
- addDirectory4Rolling( false,fileDay , fileHour, cluster , dataSource);
- }
- else
- {
- throw new RuntimeException("Wrong fileName format! [" + fileName+"]");
- }
- }
- }
-
- static void addDirectory4Rolling(boolean isDailyOnly, int day,int hour,String cluster, String dataSource) throws IOException
- {
- // TODO get root directory from config
- String rollingDirectory = "/chukwa/rolling/";
-
- Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster +"/" + dataSource);
- if (!fs.exists(path))
- { fs.mkdirs(path);}
-
- if (!isDailyOnly)
- {
- path = new Path(rollingDirectory + "/hourly/" + day + "/" + hour + "/" + cluster +"/" + dataSource);
- if (!fs.exists(path))
- { fs.mkdirs(path);}
- }
- }
-
- static void writeRecordFile(String destDir,Path recordFile,String fileName) throws IOException
- {
- boolean done = false;
- int count = 1;
- do
- {
- Path destDirPath = new Path(destDir );
- Path destFilePath = new Path(destDir + "/" + fileName + "." + count + ".evt" );
-
- if (!fs.exists(destDirPath))
- {
- fs.mkdirs(destDirPath);
- log.info(">>>>>>>>>>>> create Dir" + destDirPath);
- }
-
- if (!fs.exists(destFilePath))
- {
- log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "+ destFilePath);
- //fs.rename(recordFile,destFilePath);
- FileUtil.copy(fs,recordFile,fs,destFilePath,false,false,conf);
- //FileUtil.replaceFile(new File(recordFile.toUri()), new File(destFilePath.toUri()));
- done = true;
- log.info(">>>>>>>>>>>> after Rename" + destFilePath);
- }
- else
- {
- log.info("Start MoveToRepository main()");
- }
- count ++;
- // Just put a limit here
- // TODO read from config
- if (count > 1000)
- {
- throw new IOException("too many files in this directory: " + destDir);
- }
- } while (!done);
- }
-
- static boolean checkRotate(String directoryAsString, boolean createDirectoryIfNotExist) throws IOException
- {
- Path directory = new Path(directoryAsString);
- boolean exist = fs.exists(directory);
-
- if (! exist )
- {
- if (createDirectoryIfNotExist== true)
- { fs.mkdirs(directory); }
- return false;
- }
- else
- {
- return fs.exists(new Path(directoryAsString + "/rotateDone"));
- }
- }
-
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception
- {
- conf = new ChukwaConfiguration();
- String fsName = conf.get("writer.hdfs.filesystem");
- fs = FileSystem.get(new URI(fsName), conf);
-
- Path srcDir = new Path(args[0]);
- String destDir = args[1];
-
- log.info("Start MoveToRepository main()");
-
- FileStatus fstat = fs.getFileStatus(srcDir);
-
- if (!fstat.isDir())
- {
- throw new IOException(srcDir + " is not a directory!");
- }
- else
- {
- FileStatus[] clusters = fs.listStatus(srcDir);
- // Run a moveOrMerge on all clusters
- String name = null;
- for(FileStatus cluster : clusters)
- {
- name = cluster.getPath().getName();
- // Skip hadoop M/R outputDir
- if ( (name.intern() == HadoopLogDir.intern() ) || (name.intern() == hadoopTempDir.intern()) )
- {
- continue;
- }
- log.info("main procesing Cluster (" + cluster.getPath().getName() +")");
- processClutserDirectory(cluster.getPath(),destDir + "/" + cluster.getPath().getName());
-
- // Delete the demux's cluster dir
- FileUtil.fullyDelete(fs,cluster.getPath());
- }
- }
-
- log.info("Done with MoveToRepository main()");
+public class MoveToRepository {
+ static Logger log = Logger.getLogger(MoveToRepository.class);
+
+ static ChukwaConfiguration conf = null;
+ static FileSystem fs = null;
+ static final String HadoopLogDir = "_logs";
+ static final String hadoopTempDir = "_temporary";
+ static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+ static Calendar calendar = Calendar.getInstance();
+
+ static void processClutserDirectory(Path srcDir, String destDir)
+ throws Exception {
+ log.info("processClutserDirectory (" + srcDir.getName() + "," + destDir
+ + ")");
+ FileStatus fstat = fs.getFileStatus(srcDir);
+
+ if (!fstat.isDir()) {
+ throw new IOException(srcDir + " is not a directory!");
+ } else {
+ FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
+
+ for (FileStatus datasourceDirectory : datasourceDirectories) {
+ log.info(datasourceDirectory.getPath() + " isDir?"
+ + datasourceDirectory.isDir());
+ if (!datasourceDirectory.isDir()) {
+ throw new IOException(
+ "Top level datasource directory should be a directory :"
+ + datasourceDirectory.getPath());
+ }
+
+ String dirName = datasourceDirectory.getPath().getName();
+ Path destPath = new Path(destDir + "/" + dirName);
+ log.info("dest directory path: " + destPath);
+ log.info("processClutserDirectory processing Datasource: (" + dirName
+ + ")");
+ processDatasourceDirectory(srcDir.getName(), datasourceDirectory
+ .getPath(), destDir + "/" + dirName);
+ }
+ }
+ }
+
+ static void processDatasourceDirectory(String cluster, Path srcDir,
+ String destDir) throws Exception {
+ String fileName = null;
+ int fileDay = 0;
+ int fileHour = 0;
+ int fileMin = 0;
+
+ FileStatus[] recordFiles = fs.listStatus(srcDir);
+ for (FileStatus recordFile : recordFiles) {
+ // dataSource_20080915_18_15.1.evt
+ // <datasource>_<yyyyMMdd_HH_mm>.1.evt
+
+ fileName = recordFile.getPath().getName();
+ log.info("processDatasourceDirectory processing RecordFile: (" + fileName
+ + ")");
+ log.info("fileName: " + fileName);
+
+ int l = fileName.length();
+ String dataSource = srcDir.getName();
+ log.info("Datasource: " + dataSource);
+
+ if (fileName.endsWith(".D.evt")) {
+ // Hadoop_dfs_datanode_20080919.D.evt
+
+ fileDay = Integer.parseInt(fileName.substring(l - 14, l - 6));
+ writeRecordFile(destDir + "/" + fileDay + "/", recordFile.getPath(),
+ dataSource + "_" + fileDay);
+ // mark this directory for Daily rotate (re-process)
+ addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource);
+ } else if (fileName.endsWith(".H.evt")) {
+ // Hadoop_dfs_datanode_20080925_1.H.evt
+ // Hadoop_dfs_datanode_20080925_12.H.evt
+
+ String day = null;
+ String hour = null;
+ if (fileName.charAt(l - 8) == '_') {
+ day = fileName.substring(l - 16, l - 8);
+ log.info("day->" + day);
+ hour = "" + fileName.charAt(l - 7);
+ log.info("hour->" + hour);
+ } else {
+ day = fileName.substring(l - 17, l - 9);
+ log.info("day->" + day);
+ hour = fileName.substring(l - 8, l - 6);
+ log.info("hour->" + hour);
+ }
+ fileDay = Integer.parseInt(day);
+ fileHour = Integer.parseInt(hour);
+ // rotate there so spill
+ writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/",
+ recordFile.getPath(), dataSource + "_" + fileDay + "_" + fileHour);
+ // mark this directory for daily rotate
+ addDirectory4Rolling(true, fileDay, fileHour, cluster, dataSource);
+ } else if (fileName.endsWith(".R.evt")) {
+ if (fileName.charAt(l - 11) == '_') {
+ fileDay = Integer.parseInt(fileName.substring(l - 19, l - 11));
+ fileHour = Integer.parseInt("" + fileName.charAt(l - 10));
+ fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
+ } else {
+ fileDay = Integer.parseInt(fileName.substring(l - 20, l - 12));
+ fileHour = Integer.parseInt(fileName.substring(l - 11, l - 9));
+ fileMin = Integer.parseInt(fileName.substring(l - 8, l - 6));
+ }
+
+ log.info("fileDay: " + fileDay);
+ log.info("fileHour: " + fileHour);
+ log.info("fileMin: " + fileMin);
+ writeRecordFile(destDir + "/" + fileDay + "/" + fileHour + "/"
+ + fileMin, recordFile.getPath(), dataSource + "_" + fileDay + "_"
+ + fileHour + "_" + fileMin);
+ // mark this directory for hourly rotate
+ addDirectory4Rolling(false, fileDay, fileHour, cluster, dataSource);
+ } else {
+ throw new RuntimeException("Wrong fileName format! [" + fileName + "]");
+ }
+ }
+ }
+
+ static void addDirectory4Rolling(boolean isDailyOnly, int day, int hour,
+ String cluster, String dataSource) throws IOException {
+ // TODO get root directory from config
+ String rollingDirectory = "/chukwa/rolling/";
+
+ Path path = new Path(rollingDirectory + "/daily/" + day + "/" + cluster
+ + "/" + dataSource);
+ if (!fs.exists(path)) {
+ fs.mkdirs(path);
+ }
+
+ if (!isDailyOnly) {
+ path = new Path(rollingDirectory + "/hourly/" + day + "/" + hour + "/"
+ + cluster + "/" + dataSource);
+ if (!fs.exists(path)) {
+ fs.mkdirs(path);
+ }
+ }
+ }
+
+ static void writeRecordFile(String destDir, Path recordFile, String fileName)
+ throws IOException {
+ boolean done = false;
+ int count = 1;
+ do {
+ Path destDirPath = new Path(destDir);
+ Path destFilePath = new Path(destDir + "/" + fileName + "." + count
+ + ".evt");
+
+ if (!fs.exists(destDirPath)) {
+ fs.mkdirs(destDirPath);
+ log.info(">>>>>>>>>>>> create Dir" + destDirPath);
+ }
+
+ if (!fs.exists(destFilePath)) {
+ log.info(">>>>>>>>>>>> Before Rename" + recordFile + " -- "
+ + destFilePath);
+ // fs.rename(recordFile,destFilePath);
+ FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf);
+ // FileUtil.replaceFile(new File(recordFile.toUri()), new
+ // File(destFilePath.toUri()));
+ done = true;
+ log.info(">>>>>>>>>>>> after Rename" + destFilePath);
+ } else {
+ log.info("Start MoveToRepository main()");
+ }
+ count++;
+ // Just put a limit here
+ // TODO read from config
+ if (count > 1000) {
+ throw new IOException("too many files in this directory: " + destDir);
+ }
+ } while (!done);
+ }
+
+ static boolean checkRotate(String directoryAsString,
+ boolean createDirectoryIfNotExist) throws IOException {
+ Path directory = new Path(directoryAsString);
+ boolean exist = fs.exists(directory);
+
+ if (!exist) {
+ if (createDirectoryIfNotExist == true) {
+ fs.mkdirs(directory);
+ }
+ return false;
+ } else {
+ return fs.exists(new Path(directoryAsString + "/rotateDone"));
+ }
+ }
+
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ conf = new ChukwaConfiguration();
+ String fsName = conf.get("writer.hdfs.filesystem");
+ fs = FileSystem.get(new URI(fsName), conf);
+
+ Path srcDir = new Path(args[0]);
+ String destDir = args[1];
+
+ log.info("Start MoveToRepository main()");
+
+ FileStatus fstat = fs.getFileStatus(srcDir);
+
+ if (!fstat.isDir()) {
+ throw new IOException(srcDir + " is not a directory!");
+ } else {
+ FileStatus[] clusters = fs.listStatus(srcDir);
+ // Run a moveOrMerge on all clusters
+ String name = null;
+ for (FileStatus cluster : clusters) {
+ name = cluster.getPath().getName();
+ // Skip hadoop M/R outputDir
+ if ((name.intern() == HadoopLogDir.intern())
+ || (name.intern() == hadoopTempDir.intern())) {
+ continue;
+ }
+ log
+ .info("main procesing Cluster (" + cluster.getPath().getName()
+ + ")");
+ processClutserDirectory(cluster.getPath(), destDir + "/"
+ + cluster.getPath().getName());
+
+ // Delete the demux's cluster dir
+ FileUtil.fullyDelete(fs, cluster.getPath());
+ }
+ }
+
+ log.info("Done with MoveToRepository main()");
- }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
package org.apache.hadoop.chukwa.extraction.demux;
-import java.io.IOException;
+import java.io.IOException;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -29,115 +29,105 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
-public class RecordMerger extends Thread
-{
- static Logger log = Logger.getLogger(RecordMerger.class);
- ChukwaConfiguration conf = null;
- FileSystem fs = null;
- String[] mergeArgs = null;
- Tool tool = null;
- boolean deleteRawData = false;
-
- public RecordMerger(ChukwaConfiguration conf,FileSystem fs,Tool tool,String[] mergeArgs,boolean deleteRawData)
- {
- this.conf = conf;
- this.fs = fs;
- this.tool = tool;
- this.mergeArgs = mergeArgs;
- this.deleteRawData = deleteRawData;
- }
- @Override
- public void run()
- {
- System.out.println("\t Running Merge! : output [" + mergeArgs[1] +"]");
- int res;
- try
- {
- res = ToolRunner.run(conf,tool, mergeArgs);
- System.out.println("MR exit status: " + res);
- if (res == 0)
- {
- writeRecordFile(mergeArgs[1]+"/part-00000",mergeArgs[2],mergeArgs[3]);
-
- // delete input
- if (deleteRawData)
- {
- FileUtil.fullyDelete(fs,new Path(mergeArgs[0]));
-
- Path hours = new Path(mergeArgs[2]) ;
- FileStatus[] hoursOrMinutesFS = fs.listStatus(hours);
- for(FileStatus hourOrMinuteFS : hoursOrMinutesFS)
- {
- String dirName = hourOrMinuteFS.getPath().getName();
-
- try
- {
- Integer.parseInt(dirName);
- FileUtil.fullyDelete(fs,new Path(mergeArgs[2] + "/" + dirName));
- if (log.isDebugEnabled() )
- { log.debug("Deleting Hour directory: " + mergeArgs[2] + "/" + dirName); }
- }
- catch(NumberFormatException e) { /* Not an Hour or Minutes directory- Do nothing */ }
- }
- }
-
- // delete rolling tag
- FileUtil.fullyDelete(fs, new Path(mergeArgs[3]));
- // delete M/R temp directory
- FileUtil.fullyDelete(fs, new Path(mergeArgs[1]));
- }
- else
- {
- throw new RuntimeException("Error in M/R merge operation!");
- }
-
- }
- catch (Exception e)
- {
- e.printStackTrace();
- throw new RuntimeException("Error in M/R merge operation!",e);
- }
- }
-
-
- void writeRecordFile(String input,String outputDir,String fileName) throws IOException
- {
- boolean done = false;
- int count = 1;
- Path recordFile = new Path(input);
- do
- {
- Path destDirPath = new Path(outputDir );
- Path destFilePath = new Path(outputDir + "/" + fileName + "." + count + ".evt" );
-
- if (!fs.exists(destDirPath))
- {
- fs.mkdirs(destDirPath);
- log.info(">>>>>>>>>>>> create Dir" + destDirPath);
- }
-
- if (!fs.exists(destFilePath))
- {
- boolean res = fs.rename(recordFile,destFilePath);
-
- if (res == false)
- {
- log.info(">>>>>>>>>>>> Use standard copy rename failded");
- FileUtil.copy(fs,recordFile,fs,destFilePath,false,false,conf);
- }
- done = true;
- }
- else
- {
- log.info("Start MoveToRepository main()");
- }
- count ++;
- // Just put a limit here
- // TODO read from config
- if (count > 1000)
- {
- throw new IOException("too many files in this directory: " + destDirPath);
- }
- } while (!done);
- }
+public class RecordMerger extends Thread {
+ static Logger log = Logger.getLogger(RecordMerger.class);
+ ChukwaConfiguration conf = null;
+ FileSystem fs = null;
+ String[] mergeArgs = null;
+ Tool tool = null;
+ boolean deleteRawData = false;
+
+ public RecordMerger(ChukwaConfiguration conf, FileSystem fs, Tool tool,
+ String[] mergeArgs, boolean deleteRawData) {
+ this.conf = conf;
+ this.fs = fs;
+ this.tool = tool;
+ this.mergeArgs = mergeArgs;
+ this.deleteRawData = deleteRawData;
+ }
+
+ @Override
+ public void run() {
+ System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
+ int res;
+ try {
+ res = ToolRunner.run(conf, tool, mergeArgs);
+ System.out.println("MR exit status: " + res);
+ if (res == 0) {
+ writeRecordFile(mergeArgs[1] + "/part-00000", mergeArgs[2],
+ mergeArgs[3]);
+
+ // delete input
+ if (deleteRawData) {
+ FileUtil.fullyDelete(fs, new Path(mergeArgs[0]));
+
+ Path hours = new Path(mergeArgs[2]);
+ FileStatus[] hoursOrMinutesFS = fs.listStatus(hours);
+ for (FileStatus hourOrMinuteFS : hoursOrMinutesFS) {
+ String dirName = hourOrMinuteFS.getPath().getName();
+
+ try {
+ Integer.parseInt(dirName);
+ FileUtil.fullyDelete(fs, new Path(mergeArgs[2] + "/" + dirName));
+ if (log.isDebugEnabled()) {
+ log.debug("Deleting Hour directory: " + mergeArgs[2] + "/"
+ + dirName);
+ }
+ } catch (NumberFormatException e) { /*
+ * Not an Hour or Minutes
+ * directory- Do nothing
+ */
+ }
+ }
+ }
+
+ // delete rolling tag
+ FileUtil.fullyDelete(fs, new Path(mergeArgs[3]));
+ // delete M/R temp directory
+ FileUtil.fullyDelete(fs, new Path(mergeArgs[1]));
+ } else {
+ throw new RuntimeException("Error in M/R merge operation!");
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Error in M/R merge operation!", e);
+ }
+ }
+
+ void writeRecordFile(String input, String outputDir, String fileName)
+ throws IOException {
+ boolean done = false;
+ int count = 1;
+ Path recordFile = new Path(input);
+ do {
+ Path destDirPath = new Path(outputDir);
+ Path destFilePath = new Path(outputDir + "/" + fileName + "." + count
+ + ".evt");
+
+ if (!fs.exists(destDirPath)) {
+ fs.mkdirs(destDirPath);
+ log.info(">>>>>>>>>>>> create Dir" + destDirPath);
+ }
+
+ if (!fs.exists(destFilePath)) {
+ boolean res = fs.rename(recordFile, destFilePath);
+
+ if (res == false) {
+ log.info(">>>>>>>>>>>> Use standard copy rename failded");
+ FileUtil.copy(fs, recordFile, fs, destFilePath, false, false, conf);
+ }
+ done = true;
+ } else {
+ log.info("Start MoveToRepository main()");
+ }
+ count++;
+ // Just put a limit here
+ // TODO read from config
+ if (count > 1000) {
+ throw new IOException("too many files in this directory: "
+ + destDirPath);
+ }
+ } while (!done);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/TaggerPlugin.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
package org.apache.hadoop.chukwa.extraction.demux;
+
import org.apache.hadoop.chukwa.extraction.engine.Record;
-public interface TaggerPlugin
-{
- public void tag(String line, Record record);
+public interface TaggerPlugin {
+ public void tag(String line, Record record);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/ChukwaOutputCollector.java Wed Mar 11 22:39:26 2009
@@ -1,20 +1,22 @@
package org.apache.hadoop.chukwa.extraction.demux.processor;
-import java.io.IOException;
+import java.io.IOException;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-public class ChukwaOutputCollector implements OutputCollector<ChukwaRecordKey, ChukwaRecord>
-{
+public class ChukwaOutputCollector implements
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> {
private OutputCollector<ChukwaRecordKey, ChukwaRecord> outputCollector = null;
private Reporter reporter = null;
private String groupName = null;
-
- public ChukwaOutputCollector(String groupName,OutputCollector<ChukwaRecordKey, ChukwaRecord> outputCollector,Reporter reporter)
- {
+
+ public ChukwaOutputCollector(
+ String groupName,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> outputCollector,
+ Reporter reporter) {
this.reporter = reporter;
this.outputCollector = outputCollector;
this.groupName = groupName;
@@ -22,12 +24,10 @@
@Override
public void collect(ChukwaRecordKey key, ChukwaRecord value)
- throws IOException
- {
+ throws IOException {
this.outputCollector.collect(key, value);
reporter.incrCounter(groupName, "total records", 1);
- reporter.incrCounter(groupName, key.getReduceType() +" records" , 1);
+ reporter.incrCounter(groupName, key.getReduceType() + " records", 1);
}
-
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/Util.java Wed Mar 11 22:39:26 2009
@@ -18,72 +18,58 @@
package org.apache.hadoop.chukwa.extraction.demux.processor;
+
import java.text.SimpleDateFormat;
import java.util.Calendar;
-public class Util
-{
- static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
-
- static Calendar calendar = Calendar.getInstance();
- static int currentDay = 0;
- static int currentHour = 0;
-
- static
- {
- synchronized(calendar)
- {
- calendar.setTimeInMillis( System.currentTimeMillis());
- currentDay = Integer.parseInt(day.format(calendar.getTime()));
- currentHour = calendar.get(Calendar.HOUR_OF_DAY);
- }
- }
-
- public static String generateTimeOutput(long timestamp)
- {
- int workingDay = 0;
- int workingHour = 0;
-
- String output = null;
-
- int minutes = 0;
- synchronized(calendar)
- {
- calendar.setTimeInMillis( timestamp);
- workingDay = Integer.parseInt(day.format(calendar.getTime()));
- workingHour = calendar.get(Calendar.HOUR_OF_DAY);
- minutes = calendar.get(Calendar.MINUTE);
- }
-
- if (workingDay != currentDay)
- {
- output = "_" + workingDay + ".D.evt";
- }
- else
- {
- if (workingHour != currentHour)
- {
- output = "_" +workingDay + "_" + workingHour + ".H.evt";
- }
- else
- {
- output = "_" + workingDay + "_" + workingHour + "_";
- int dec = minutes/10;
- output += dec ;
-
- int m = minutes - (dec*10);
- if (m < 5)
- {
- output += "0.R.evt";
- }
- else
- {
- output += "5.R.evt";
- }
- }
- }
+public class Util {
+ static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd");
+ static Calendar calendar = Calendar.getInstance();
+ static int currentDay = 0;
+ static int currentHour = 0;
+
+ static {
+ synchronized (calendar) {
+ calendar.setTimeInMillis(System.currentTimeMillis());
+ currentDay = Integer.parseInt(day.format(calendar.getTime()));
+ currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+ }
+ }
+
+ public static String generateTimeOutput(long timestamp) {
+ int workingDay = 0;
+ int workingHour = 0;
+
+ String output = null;
+
+ int minutes = 0;
+ synchronized (calendar) {
+ calendar.setTimeInMillis(timestamp);
+ workingDay = Integer.parseInt(day.format(calendar.getTime()));
+ workingHour = calendar.get(Calendar.HOUR_OF_DAY);
+ minutes = calendar.get(Calendar.MINUTE);
+ }
+
+ if (workingDay != currentDay) {
+ output = "_" + workingDay + ".D.evt";
+ } else {
+ if (workingHour != currentHour) {
+ output = "_" + workingDay + "_" + workingHour + ".H.evt";
+ } else {
+ output = "_" + workingDay + "_" + workingHour + "_";
+ int dec = minutes / 10;
+ output += dec;
+
+ int m = minutes - (dec * 10);
+ if (m < 5) {
+ output += "0.R.evt";
+ } else {
+ output += "5.R.evt";
+ }
+ }
+ }
- return output;
- }
+ return output;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-import java.util.Calendar;
+import java.util.Calendar;
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
@@ -30,10 +30,9 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public abstract class AbstractProcessor implements MapProcessor
-{
+public abstract class AbstractProcessor implements MapProcessor {
static Logger log = Logger.getLogger(AbstractProcessor.class);
-
+
Calendar calendar = Calendar.getInstance();
byte[] bytes;
int[] recordOffsets;
@@ -48,24 +47,19 @@
OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;
Reporter reporter = null;
- public AbstractProcessor()
- {
+ public AbstractProcessor() {
}
protected abstract void parse(String recordEntry,
OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
throws Throwable;
- protected void saveChunkInError(Throwable throwable)
- {
- if (chunkInErrorSaved == false)
- {
- try
- {
+ protected void saveChunkInError(Throwable throwable) {
+ if (chunkInErrorSaved == false) {
+ try {
ChunkSaver.saveChunk(chunk, throwable, output, reporter);
chunkInErrorSaved = true;
- } catch (Exception e)
- {
+ } catch (Exception e) {
e.printStackTrace();
}
}
@@ -73,31 +67,26 @@
}
public void process(ChukwaArchiveKey archiveKey, Chunk chunk,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- {
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
chunkInErrorSaved = false;
-
+
this.archiveKey = archiveKey;
this.output = output;
this.reporter = reporter;
-
+
reset(chunk);
-
- while (hasNext())
- {
- try
- {
+
+ while (hasNext()) {
+ try {
parse(nextLine(), output, reporter);
- } catch (Throwable e)
- {
+ } catch (Throwable e) {
saveChunkInError(e);
}
}
}
protected void buildGenericRecord(ChukwaRecord record, String body,
- long timestamp, String dataSource)
- {
+ long timestamp, String dataSource) {
calendar.setTimeInMillis(timestamp);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
@@ -107,8 +96,7 @@
+ timestamp);
key.setReduceType(dataSource);
- if (body != null)
- {
+ if (body != null) {
record.add(Record.bodyField, body);
}
record.setTime(timestamp);
@@ -119,8 +107,7 @@
}
- protected void reset(Chunk chunk)
- {
+ protected void reset(Chunk chunk) {
this.chunk = chunk;
this.bytes = chunk.getData();
this.recordOffsets = chunk.getRecordOffsets();
@@ -128,13 +115,11 @@
startOffset = 0;
}
- protected boolean hasNext()
- {
+ protected boolean hasNext() {
return (currentPos < recordOffsets.length);
}
- protected String nextLine()
- {
+ protected String nextLine() {
String log = new String(bytes, startOffset, (recordOffsets[currentPos]
- startOffset + 1));
startOffset = recordOffsets[currentPos] + 1;
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,14 +18,16 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-public interface ChunkProcessor
-{
- public String getDataType();
- public void process(Chunk chunk,OutputCollector<Text, ChukwaRecord> output, Reporter reporter);
+public interface ChunkProcessor {
+ public String getDataType();
+
+ public void process(Chunk chunk, OutputCollector<Text, ChukwaRecord> output,
+ Reporter reporter);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-import java.util.Calendar;
+import java.util.Calendar;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -30,17 +30,15 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class ChunkSaver
-{
+public class ChunkSaver {
static Logger log = Logger.getLogger(ChunkSaver.class);
+
public static ChukwaRecord saveChunk(Chunk chunk, Throwable throwable,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- {
- try
- {
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+ try {
reporter.incrCounter("DemuxError", "count", 1);
reporter.incrCounter("DemuxError", chunk.getDataType() + "Count", 1);
-
+
ChukwaRecord record = new ChukwaRecord();
long ts = System.currentTimeMillis();
Calendar calendar = Calendar.getInstance();
@@ -68,22 +66,15 @@
output.collect(key, record);
return record;
- }
- catch (Throwable e)
- {
+ } catch (Throwable e) {
e.printStackTrace();
- try
- {
- log.warn("Unable to save a chunk: tags: "
- + chunk.getTags() + " - source:"
- + chunk.getSource() + " - dataType: "
- + chunk.getDataType() + " - Stream: "
- + chunk.getStreamName() + " - SeqId: "
- + chunk.getSeqID() + " - Data: "
- + new String(chunk.getData()) );
- }
- catch (Throwable e1)
- {
+ try {
+ log.warn("Unable to save a chunk: tags: " + chunk.getTags()
+ + " - source:" + chunk.getSource() + " - dataType: "
+ + chunk.getDataType() + " - Stream: " + chunk.getStreamName()
+ + " - SeqId: " + chunk.getSeqID() + " - Data: "
+ + new String(chunk.getData()));
+ } catch (Throwable e1) {
e.printStackTrace();
}
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java Wed Mar 11 22:39:26 2009
@@ -18,31 +18,27 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-public class DFInvalidRecord extends Exception
-{
- /**
+public class DFInvalidRecord extends Exception {
+
+ /**
*
*/
- private static final long serialVersionUID = 1254238125122522523L;
+ private static final long serialVersionUID = 1254238125122522523L;
+
+ public DFInvalidRecord() {
+ }
- public DFInvalidRecord()
- {
- }
-
- public DFInvalidRecord(String arg0)
- {
- super(arg0);
- }
-
- public DFInvalidRecord(Throwable arg0)
- {
- super(arg0);
- }
-
- public DFInvalidRecord(String arg0, Throwable arg1)
- {
- super(arg0, arg1);
- }
+ public DFInvalidRecord(String arg0) {
+ super(arg0);
+ }
+
+ public DFInvalidRecord(Throwable arg0) {
+ super(arg0);
+ }
+
+ public DFInvalidRecord(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DebugOutputProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,39 +18,34 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-import java.io.IOException;
+import java.io.IOException;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class DebugOutputProcessor extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(DebugOutputProcessor.class);
- public static final String recordType = "Debug";
-
- @Override
- public void parse(String line, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- {
- log.info("record: [" + line + "] type[" + chunk.getDataType() + "]");
-
- ChukwaRecord record = new ChukwaRecord();
- buildGenericRecord(record,line, System.currentTimeMillis(),recordType);
- key.setKey("" + chunk.getSeqID());
- try
- {
- output.collect(key, record);
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
-
- public String getDataType()
- {
- return DebugOutputProcessor.recordType;
- }
+public class DebugOutputProcessor extends AbstractProcessor {
+ static Logger log = Logger.getLogger(DebugOutputProcessor.class);
+ public static final String recordType = "Debug";
+
+ @Override
+ public void parse(String line,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+ log.info("record: [" + line + "] type[" + chunk.getDataType() + "]");
+
+ ChukwaRecord record = new ChukwaRecord();
+ buildGenericRecord(record, line, System.currentTimeMillis(), recordType);
+ key.setKey("" + chunk.getSeqID());
+ try {
+ output.collect(key, record);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public String getDataType() {
+ return DebugOutputProcessor.recordType;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DefaultProcessor.java Wed Mar 11 22:39:26 2009
@@ -1,32 +1,28 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-import java.io.IOException;
+import java.io.IOException;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class DefaultProcessor extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(DefaultProcessor.class);
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- {
- try
- {
- ChukwaRecord record = new ChukwaRecord();
- this.buildGenericRecord(record, recordEntry, archiveKey.getTimePartition(), chunk.getDataType());
- output.collect(key, record);
- }
- catch (IOException e)
- {
- log.warn("Unable to collect output in DefaultProcessor ["
- + recordEntry + "]", e);
- e.printStackTrace();
- }
- }
+public class DefaultProcessor extends AbstractProcessor {
+ static Logger log = Logger.getLogger(DefaultProcessor.class);
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) {
+ try {
+ ChukwaRecord record = new ChukwaRecord();
+ this.buildGenericRecord(record, recordEntry, archiveKey
+ .getTimePartition(), chunk.getDataType());
+ output.collect(key, record);
+ } catch (IOException e) {
+ log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
+ + "]", e);
+ e.printStackTrace();
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java Wed Mar 11 22:39:26 2009
@@ -18,107 +18,97 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class Df extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(Df.class);
- private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
- "Used", "Available", "Use%", "Mounted", "on" };
- private static final String[] headerCols = { "Filesystem", "1K-blocks",
- "Used", "Available", "Use%", "Mounted on" };
- private SimpleDateFormat sdf = null;
-
- public Df()
- {
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- }
-
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
-
- try
- {
- String dStr = recordEntry.substring(0, 23);
- int start = 24;
- int idx = recordEntry.indexOf(' ', start);
- // String level = recordEntry.substring(start, idx);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- // String className = recordEntry.substring(start, idx-1);
- String body = recordEntry.substring(idx + 1);
-
- Date d = sdf.parse(dStr);
- String[] lines = body.split("\n");
-
- String[] outputCols = lines[0].split("[\\s]++");
-
- if (outputCols.length != headerSplitCols.length
- || outputCols[0].intern() != headerSplitCols[0].intern()
- || outputCols[1].intern() != headerSplitCols[1].intern()
- || outputCols[2].intern() != headerSplitCols[2].intern()
- || outputCols[3].intern() != headerSplitCols[3].intern()
- || outputCols[4].intern() != headerSplitCols[4].intern()
- || outputCols[5].intern() != headerSplitCols[5].intern()
- || outputCols[6].intern() != headerSplitCols[6].intern()
- )
- {
- throw new DFInvalidRecord("Wrong output format (header) ["
- + recordEntry + "]");
- }
-
- String[] values = null;
-
- // Data
- ChukwaRecord record = null;
-
- for (int i = 1; i < lines.length; i++)
- {
- values = lines[i].split("[\\s]++");
- key = new ChukwaRecordKey();
- record = new ChukwaRecord();
- this.buildGenericRecord(record, null, d.getTime(), "Df");
-
- record.add(headerCols[0], values[0]);
- record.add(headerCols[1], values[1]);
- record.add(headerCols[2], values[2]);
- record.add(headerCols[3], values[3]);
- record.add(headerCols[4], values[4].substring(0, values[4].length()-1)); // Remove %
- record.add(headerCols[5], values[5]);
-
- output.collect(key, record);
- }
-
- //log.info("DFProcessor output 1 DF record");
- } catch (ParseException e)
- {
- e.printStackTrace();
- log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
- throw e;
- } catch (IOException e)
- {
- e.printStackTrace();
- log.warn("Unable to collect output in DFProcessor [" + recordEntry
- + "]", e);
- throw e;
- } catch (DFInvalidRecord e)
- {
- e.printStackTrace();
- log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
- throw e;
- }
- }
+public class Df extends AbstractProcessor {
+ static Logger log = Logger.getLogger(Df.class);
+ private static final String[] headerSplitCols = { "Filesystem", "1K-blocks",
+ "Used", "Available", "Use%", "Mounted", "on" };
+ private static final String[] headerCols = { "Filesystem", "1K-blocks",
+ "Used", "Available", "Use%", "Mounted on" };
+ private SimpleDateFormat sdf = null;
+
+ public Df() {
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+ }
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+
+ try {
+ String dStr = recordEntry.substring(0, 23);
+ int start = 24;
+ int idx = recordEntry.indexOf(' ', start);
+ // String level = recordEntry.substring(start, idx);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ // String className = recordEntry.substring(start, idx-1);
+ String body = recordEntry.substring(idx + 1);
+
+ Date d = sdf.parse(dStr);
+ String[] lines = body.split("\n");
+
+ String[] outputCols = lines[0].split("[\\s]++");
+
+ if (outputCols.length != headerSplitCols.length
+ || outputCols[0].intern() != headerSplitCols[0].intern()
+ || outputCols[1].intern() != headerSplitCols[1].intern()
+ || outputCols[2].intern() != headerSplitCols[2].intern()
+ || outputCols[3].intern() != headerSplitCols[3].intern()
+ || outputCols[4].intern() != headerSplitCols[4].intern()
+ || outputCols[5].intern() != headerSplitCols[5].intern()
+ || outputCols[6].intern() != headerSplitCols[6].intern()) {
+ throw new DFInvalidRecord("Wrong output format (header) ["
+ + recordEntry + "]");
+ }
+
+ String[] values = null;
+
+ // Data
+ ChukwaRecord record = null;
+
+ for (int i = 1; i < lines.length; i++) {
+ values = lines[i].split("[\\s]++");
+ key = new ChukwaRecordKey();
+ record = new ChukwaRecord();
+ this.buildGenericRecord(record, null, d.getTime(), "Df");
+
+ record.add(headerCols[0], values[0]);
+ record.add(headerCols[1], values[1]);
+ record.add(headerCols[2], values[2]);
+ record.add(headerCols[3], values[3]);
+ record.add(headerCols[4], values[4]
+ .substring(0, values[4].length() - 1)); // Remove %
+ record.add(headerCols[5], values[5]);
+
+ output.collect(key, record);
+ }
+
+ // log.info("DFProcessor output 1 DF record");
+ } catch (ParseException e) {
+ e.printStackTrace();
+ log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
+ throw e;
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.warn("Unable to collect output in DFProcessor [" + recordEntry + "]",
+ e);
+ throw e;
+ } catch (DFInvalidRecord e) {
+ e.printStackTrace();
+ log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e);
+ throw e;
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DuplicateProcessorException.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,27 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
-public class DuplicateProcessorException extends RuntimeException
-{
- /**
+public class DuplicateProcessorException extends RuntimeException {
+
+ /**
*
*/
- private static final long serialVersionUID = 3890267797961057789L;
+ private static final long serialVersionUID = 3890267797961057789L;
- public DuplicateProcessorException()
- {}
+ public DuplicateProcessorException() {
+ }
- public DuplicateProcessorException(String message)
- {
- super(message);
- }
-
- public DuplicateProcessorException(Throwable cause)
- {
- super(cause);
- }
-
- public DuplicateProcessorException(String message, Throwable cause)
- {
- super(message, cause);
- }
+ public DuplicateProcessorException(String message) {
+ super(message);
+ }
+
+ public DuplicateProcessorException(Throwable cause) {
+ super(cause);
+ }
+
+ public DuplicateProcessorException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopLogProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,79 +18,66 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
+public class HadoopLogProcessor extends AbstractProcessor {
+ static Logger log = Logger.getLogger(HadoopLogProcessor.class);
-
-public class HadoopLogProcessor extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(HadoopLogProcessor.class);
-
- private static final String recordType = "HadoopLog";
- private static final String nameNodeType = "NameNode";
- private static final String dataNodeType = "DataNode";
- private static final String auditType = "Audit";
- private SimpleDateFormat sdf = null;
-
-
- public HadoopLogProcessor()
- {
- //TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
- }
-
- @Override
- public void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
- try
- {
- String dStr = recordEntry.substring(0, 23);
- Date d = sdf.parse(dStr);
- ChukwaRecord record = new ChukwaRecord();
-
- if (this.chunk.getStreamName().indexOf("datanode") > 0) {
- buildGenericRecord(record,recordEntry,d.getTime(),dataNodeType);
- } else if (this.chunk.getStreamName().indexOf("namenode") > 0) {
- buildGenericRecord(record,recordEntry,d.getTime(),nameNodeType);
- } else if (this.chunk.getStreamName().indexOf("audit") > 0) {
- buildGenericRecord(record,recordEntry,d.getTime(),auditType);
- } else {
- buildGenericRecord(record,recordEntry,d.getTime(),recordType);
- }
-
-
- output.collect(key, record);
- }
- catch (ParseException e)
- {
- log.warn("Unable to parse the date in DefaultProcessor ["
- + recordEntry + "]", e);
- e.printStackTrace();
- throw e;
- }
- catch (IOException e)
- {
- log.warn("Unable to collect output in DefaultProcessor ["
- + recordEntry + "]", e);
- e.printStackTrace();
- throw e;
- }
- }
-
- public String getDataType()
- {
- return HadoopLogProcessor.recordType;
- }
+ private static final String recordType = "HadoopLog";
+ private static final String nameNodeType = "NameNode";
+ private static final String dataNodeType = "DataNode";
+ private static final String auditType = "Audit";
+ private SimpleDateFormat sdf = null;
+
+ public HadoopLogProcessor() {
+ // TODO move that to config
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
+ }
+
+ @Override
+ public void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+ try {
+ String dStr = recordEntry.substring(0, 23);
+ Date d = sdf.parse(dStr);
+ ChukwaRecord record = new ChukwaRecord();
+
+ if (this.chunk.getStreamName().indexOf("datanode") > 0) {
+ buildGenericRecord(record, recordEntry, d.getTime(), dataNodeType);
+ } else if (this.chunk.getStreamName().indexOf("namenode") > 0) {
+ buildGenericRecord(record, recordEntry, d.getTime(), nameNodeType);
+ } else if (this.chunk.getStreamName().indexOf("audit") > 0) {
+ buildGenericRecord(record, recordEntry, d.getTime(), auditType);
+ } else {
+ buildGenericRecord(record, recordEntry, d.getTime(), recordType);
+ }
+
+ output.collect(key, record);
+ } catch (ParseException e) {
+ log.warn("Unable to parse the date in DefaultProcessor [" + recordEntry
+ + "]", e);
+ e.printStackTrace();
+ throw e;
+ } catch (IOException e) {
+ log.warn("Unable to collect output in DefaultProcessor [" + recordEntry
+ + "]", e);
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public String getDataType() {
+ return HadoopLogProcessor.recordType;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
@@ -35,86 +35,86 @@
public class HadoopMetricsProcessor extends AbstractProcessor {
- static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
- static final String chukwaTimestampField = "chukwa_timestamp";
- static final String contextNameField = "contextName";
- static final String recordNameField = "recordName";
-
- private SimpleDateFormat sdf = null;
-
- public HadoopMetricsProcessor() {
- // TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter) throws Throwable {
- try {
- String dStr = recordEntry.substring(0, 23);
- int start = 24;
- int idx = recordEntry.indexOf(' ', start);
- // String level = recordEntry.substring(start, idx);
- start = idx + 1;
- idx = recordEntry.indexOf(' ', start);
- // String className = recordEntry.substring(start, idx-1);
- String body = recordEntry.substring(idx + 1);
- body.replaceAll("\n", "");
- // log.info("record [" + recordEntry + "] body [" + body +"]");
- Date d = sdf.parse(dStr);
-
- JSONObject json = new JSONObject(body);
-
- ChukwaRecord record = new ChukwaRecord();
- String datasource = null;
- String recordName = null;
-
- Iterator<String> ki = json.keys();
- while (ki.hasNext()) {
- String keyName = ki.next();
- if (chukwaTimestampField.intern() == keyName.intern()) {
- d = new Date(json.getLong(keyName));
- Calendar cal = Calendar.getInstance();
- cal.setTimeInMillis(d.getTime());
- cal.set(Calendar.SECOND, 0);
- cal.set(Calendar.MILLISECOND, 0);
- d.setTime(cal.getTimeInMillis());
- } else if (contextNameField.intern() == keyName.intern()) {
- datasource = "Hadoop_" + json.getString(keyName);
- } else if (recordNameField.intern() == keyName.intern()) {
- recordName = json.getString(keyName);
- record.add(keyName, json.getString(keyName));
- } else {
- record.add(keyName, json.getString(keyName));
- }
- }
-
- datasource = datasource + "_" + recordName;
- buildGenericRecord(record, null, d.getTime(), datasource);
- output.collect(key, record);
- } catch (ParseException e) {
- e.printStackTrace();
- log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry
- + "]", e);
- throw e;
- } catch (IOException e) {
- e.printStackTrace();
- log.warn("Unable to collect output in HadoopMetricsProcessor ["
- + recordEntry + "]", e);
- throw e;
- } catch (JSONException e) {
- e.printStackTrace();
- log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry
- + "]", e);
- throw e;
- }
-
- }
-
- public String getDataType() {
- return HadoopMetricsProcessor.class.getName();
- }
+ static Logger log = Logger.getLogger(HadoopMetricsProcessor.class);
+ static final String chukwaTimestampField = "chukwa_timestamp";
+ static final String contextNameField = "contextName";
+ static final String recordNameField = "recordName";
+
+ private SimpleDateFormat sdf = null;
+
+ public HadoopMetricsProcessor() {
+ // TODO move that to config
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+ try {
+ String dStr = recordEntry.substring(0, 23);
+ int start = 24;
+ int idx = recordEntry.indexOf(' ', start);
+ // String level = recordEntry.substring(start, idx);
+ start = idx + 1;
+ idx = recordEntry.indexOf(' ', start);
+ // String className = recordEntry.substring(start, idx-1);
+ String body = recordEntry.substring(idx + 1);
+ body.replaceAll("\n", "");
+ // log.info("record [" + recordEntry + "] body [" + body +"]");
+ Date d = sdf.parse(dStr);
+
+ JSONObject json = new JSONObject(body);
+
+ ChukwaRecord record = new ChukwaRecord();
+ String datasource = null;
+ String recordName = null;
+
+ Iterator<String> ki = json.keys();
+ while (ki.hasNext()) {
+ String keyName = ki.next();
+ if (chukwaTimestampField.intern() == keyName.intern()) {
+ d = new Date(json.getLong(keyName));
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(d.getTime());
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+ d.setTime(cal.getTimeInMillis());
+ } else if (contextNameField.intern() == keyName.intern()) {
+ datasource = "Hadoop_" + json.getString(keyName);
+ } else if (recordNameField.intern() == keyName.intern()) {
+ recordName = json.getString(keyName);
+ record.add(keyName, json.getString(keyName));
+ } else {
+ record.add(keyName, json.getString(keyName));
+ }
+ }
+
+ datasource = datasource + "_" + recordName;
+ buildGenericRecord(record, null, d.getTime(), datasource);
+ output.collect(key, record);
+ } catch (ParseException e) {
+ e.printStackTrace();
+ log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
+ e);
+ throw e;
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.warn("Unable to collect output in HadoopMetricsProcessor ["
+ + recordEntry + "]", e);
+ throw e;
+ } catch (JSONException e) {
+ e.printStackTrace();
+ log.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]",
+ e);
+ throw e;
+ }
+
+ }
+
+ public String getDataType() {
+ return HadoopMetricsProcessor.class.getName();
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java Wed Mar 11 22:39:26 2009
@@ -18,124 +18,118 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.log4j.Logger;
-public class Iostat extends AbstractProcessor
-{
- static Logger log = Logger.getLogger(Iostat.class);
- public final String recordType = this.getClass().getName();
-
- private static String regex="([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
- private static Pattern p = null;
-
- private Matcher matcher = null;
- private SimpleDateFormat sdf = null;
-
- public Iostat()
- {
- //TODO move that to config
- sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- p = Pattern.compile(regex);
- }
-
- @Override
- protected void parse(String recordEntry, OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
-
- log.debug("Iostat record: [" + recordEntry + "] type[" + chunk.getDataType() + "]");
- int i = 0;
-
- matcher=p.matcher(recordEntry);
- while (matcher.find())
- {
- log.debug("Iostat Processor Matches");
-
- try
- {
- Date d = sdf.parse( matcher.group(1).trim());
-
-
-
- String[] lines = recordEntry.split("\n");
- String[] headers = null;
- for(int skip=0;skip<2;skip++) {
- i++;
- while ( i < lines.length && lines[i].indexOf("avg-cpu")<0) {
- // Skip the first output because the numbers are averaged from system boot up
- log.debug("skip line:"+lines[i]);
- i++;
- }
- }
- while (i < lines.length)
- {
- ChukwaRecord record = null;
-
- if(lines[i].indexOf("avg-cpu")>=0 || lines[i].indexOf("Device")>=0) {
- headers = parseHeader(lines[i]);
- i++;
- }
- String data[] = parseData(lines[i]);
- if(headers[0].equals("avg-cpu:")) {
- log.debug("Matched CPU-Utilization");
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
- } else if(headers[0].equals("Device:")) {
- log.debug("Matched Iostat");
- record = new ChukwaRecord();
- key = new ChukwaRecordKey();
- buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
- } else {
- log.debug("No match:"+headers[0]);
- }
- if(record!=null) {
- int j=0;
- log.debug("Data Length: " + data.length);
- while(j<data.length) {
- log.debug("header:"+headers[j]+" data:"+data[j]);
- if(!headers[j].equals("avg-cpu:")) {
- record.add(headers[j],data[j]);
- }
- j++;
- }
- record.setTime(d.getTime());
- if(data.length>3) {
- output.collect(key, record);
- }
- }
- i++;
- }
- // End of parsing
- } catch (Exception e)
- {
- e.printStackTrace();
- throw e;
- }
- }
- }
-
- public String[] parseHeader(String header) {
- String[] headers = header.split("\\s+");
- return headers;
- }
-
- public String[] parseData(String dataLine) {
- String[] data = dataLine.split("\\s+");
- return data;
- }
-
- public String getDataType() {
- return recordType;
- }
+public class Iostat extends AbstractProcessor {
+ static Logger log = Logger.getLogger(Iostat.class);
+ public final String recordType = this.getClass().getName();
+
+ private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)";
+ private static Pattern p = null;
+
+ private Matcher matcher = null;
+ private SimpleDateFormat sdf = null;
+
+ public Iostat() {
+ // TODO move that to config
+ sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
+ p = Pattern.compile(regex);
+ }
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+
+ log.debug("Iostat record: [" + recordEntry + "] type["
+ + chunk.getDataType() + "]");
+ int i = 0;
+
+ matcher = p.matcher(recordEntry);
+ while (matcher.find()) {
+ log.debug("Iostat Processor Matches");
+
+ try {
+ Date d = sdf.parse(matcher.group(1).trim());
+
+ String[] lines = recordEntry.split("\n");
+ String[] headers = null;
+ for (int skip = 0; skip < 2; skip++) {
+ i++;
+ while (i < lines.length && lines[i].indexOf("avg-cpu") < 0) {
+ // Skip the first output because the numbers are averaged from
+ // system boot up
+ log.debug("skip line:" + lines[i]);
+ i++;
+ }
+ }
+ while (i < lines.length) {
+ ChukwaRecord record = null;
+
+ if (lines[i].indexOf("avg-cpu") >= 0
+ || lines[i].indexOf("Device") >= 0) {
+ headers = parseHeader(lines[i]);
+ i++;
+ }
+ String data[] = parseData(lines[i]);
+ if (headers[0].equals("avg-cpu:")) {
+ log.debug("Matched CPU-Utilization");
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+ } else if (headers[0].equals("Device:")) {
+ log.debug("Matched Iostat");
+ record = new ChukwaRecord();
+ key = new ChukwaRecordKey();
+ buildGenericRecord(record, null, d.getTime(), "SystemMetrics");
+ } else {
+ log.debug("No match:" + headers[0]);
+ }
+ if (record != null) {
+ int j = 0;
+ log.debug("Data Length: " + data.length);
+ while (j < data.length) {
+ log.debug("header:" + headers[j] + " data:" + data[j]);
+ if (!headers[j].equals("avg-cpu:")) {
+ record.add(headers[j], data[j]);
+ }
+ j++;
+ }
+ record.setTime(d.getTime());
+ if (data.length > 3) {
+ output.collect(key, record);
+ }
+ }
+ i++;
+ }
+ // End of parsing
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+
+ public String[] parseHeader(String header) {
+ String[] headers = header.split("\\s+");
+ return headers;
+ }
+
+ public String[] parseData(String dataLine) {
+ String[] data = dataLine.split("\\s+");
+ return data;
+ }
+
+ public String getDataType() {
+ return recordType;
+ }
}
\ No newline at end of file
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java Wed Mar 11 22:39:26 2009
@@ -1,15 +1,14 @@
package org.apache.hadoop.chukwa.extraction.demux.processor.mapper;
+
import java.io.File;
import java.io.FileOutputStream;
import java.util.Calendar;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
-
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.OutputCollector;
@@ -22,99 +21,101 @@
import org.w3c.dom.Text;
public class JobConfProcessor extends AbstractProcessor {
- static Logger log = Logger.getLogger(JobConfProcessor.class);
- static Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
- static Pattern hodPattern = Pattern.compile("(.*?)/(.*?)\\.(\\d+)\\.(.*?)\\.hodring/(.*?)");
- static Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
- @Override
- protected void parse(String recordEntry,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter)
- throws Throwable
- {
- Long time = 0L;
- Random randomNumber = new Random();
- String tags = this.chunk.getTags();
-
- Matcher matcher = timePattern.matcher(tags);
- if (matcher.matches()) {
- time = Long.parseLong(matcher.group(2));
- }
- String capp = this.chunk.getApplication();
- String hodID = "";
- String jobID = "";
- matcher = hodPattern.matcher(capp);
- if(matcher.matches()) {
- hodID=matcher.group(3);
+ static Logger log = Logger.getLogger(JobConfProcessor.class);
+ static Pattern timePattern = Pattern.compile("(.*)?time=\"(.*?)\"(.*)?");
+ static Pattern hodPattern = Pattern
+ .compile("(.*?)/(.*?)\\.(\\d+)\\.(.*?)\\.hodring/(.*?)");
+ static Pattern jobPattern = Pattern.compile("(.*?)job_(.*?)_conf\\.xml(.*?)");
+
+ @Override
+ protected void parse(String recordEntry,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws Throwable {
+ Long time = 0L;
+ Random randomNumber = new Random();
+ String tags = this.chunk.getTags();
+
+ Matcher matcher = timePattern.matcher(tags);
+ if (matcher.matches()) {
+ time = Long.parseLong(matcher.group(2));
+ }
+ String capp = this.chunk.getApplication();
+ String hodID = "";
+ String jobID = "";
+ matcher = hodPattern.matcher(capp);
+ if (matcher.matches()) {
+ hodID = matcher.group(3);
+ }
+ matcher = jobPattern.matcher(capp);
+ if (matcher.matches()) {
+ jobID = matcher.group(2);
+ }
+ ChukwaRecord record = new ChukwaRecord();
+ DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
+ .newInstance();
+ // ignore all comments inside the xml file
+ docBuilderFactory.setIgnoringComments(true);
+ try {
+ DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+ Document doc = null;
+ String fileName = "test_" + randomNumber.nextInt();
+ File tmp = new File(fileName);
+ FileOutputStream out = new FileOutputStream(tmp);
+ out.write(recordEntry.getBytes());
+ out.close();
+ doc = builder.parse(fileName);
+ Element root = doc.getDocumentElement();
+ if (!"configuration".equals(root.getTagName()))
+ log.fatal("bad conf file: top-level element not <configuration>");
+ NodeList props = root.getChildNodes();
+
+ for (int i = 0; i < props.getLength(); i++) {
+ Node propNode = props.item(i);
+ if (!(propNode instanceof Element))
+ continue;
+ Element prop = (Element) propNode;
+ if (!"property".equals(prop.getTagName()))
+ log.warn("bad conf file: element not <property>");
+ NodeList fields = prop.getChildNodes();
+ String attr = null;
+ String value = null;
+ boolean finalParameter = false;
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("name".equals(field.getTagName()) && field.hasChildNodes())
+ attr = ((Text) field.getFirstChild()).getData().trim();
+ if ("value".equals(field.getTagName()) && field.hasChildNodes())
+ value = ((Text) field.getFirstChild()).getData();
+ if ("final".equals(field.getTagName()) && field.hasChildNodes())
+ finalParameter = "true".equals(((Text) field.getFirstChild())
+ .getData());
}
- matcher = jobPattern.matcher(capp);
- if(matcher.matches()) {
- jobID=matcher.group(2);
+
+ // Ignore this parameter if it has already been marked as 'final'
+ if (attr != null && value != null) {
+ record.add(attr, value);
}
- ChukwaRecord record = new ChukwaRecord();
- DocumentBuilderFactory docBuilderFactory
- = DocumentBuilderFactory.newInstance();
- //ignore all comments inside the xml file
- docBuilderFactory.setIgnoringComments(true);
- try {
- DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
- Document doc = null;
- String fileName = "test_"+randomNumber.nextInt();
- File tmp = new File(fileName);
- FileOutputStream out = new FileOutputStream(tmp);
- out.write(recordEntry.getBytes());
- out.close();
- doc = builder.parse(fileName);
- Element root = doc.getDocumentElement();
- if (!"configuration".equals(root.getTagName()))
- log.fatal("bad conf file: top-level element not <configuration>");
- NodeList props = root.getChildNodes();
-
- for (int i = 0; i < props.getLength(); i++) {
- Node propNode = props.item(i);
- if (!(propNode instanceof Element))
- continue;
- Element prop = (Element)propNode;
- if (!"property".equals(prop.getTagName()))
- log.warn("bad conf file: element not <property>");
- NodeList fields = prop.getChildNodes();
- String attr = null;
- String value = null;
- boolean finalParameter = false;
- for (int j = 0; j < fields.getLength(); j++) {
- Node fieldNode = fields.item(j);
- if (!(fieldNode instanceof Element))
- continue;
- Element field = (Element)fieldNode;
- if ("name".equals(field.getTagName()) && field.hasChildNodes())
- attr = ((Text)field.getFirstChild()).getData().trim();
- if ("value".equals(field.getTagName()) && field.hasChildNodes())
- value = ((Text)field.getFirstChild()).getData();
- if ("final".equals(field.getTagName()) && field.hasChildNodes())
- finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
- }
-
- // Ignore this parameter if it has already been marked as 'final'
- if (attr != null && value != null) {
- record.add(attr, value);
- }
- }
- buildGenericRecord(record,null, time,"JobConf");
- calendar.setTimeInMillis(time);
- calendar.set(Calendar.MINUTE, 0);
- calendar.set(Calendar.SECOND, 0);
- calendar.set(Calendar.MILLISECOND, 0);
- key.setKey(""+ calendar.getTimeInMillis() + "/" + hodID +"." + jobID + "/" + time);
-
- output.collect(key,record);
- tmp.delete();
- } catch(Exception e) {
- e.printStackTrace();
- throw e;
- }
- }
-
- public String getDataType() {
- return Torque.class.getName();
- }
+ }
+ buildGenericRecord(record, null, time, "JobConf");
+ calendar.setTimeInMillis(time);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ key.setKey("" + calendar.getTimeInMillis() + "/" + hodID + "." + jobID
+ + "/" + time);
+
+ output.collect(key, record);
+ tmp.delete();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public String getDataType() {
+ return Torque.class.getName();
+ }
}