You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [6/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoop...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.extraction.database;
+
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -31,7 +32,6 @@
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -46,316 +46,336 @@
import org.apache.hadoop.io.SequenceFile;
public class MetricDataLoader {
- private static Log log = LogFactory.getLog(MetricDataLoader.class);
- private static Connection conn = null;
- private static Statement stmt = null;
- private ResultSet rs = null;
- private static DatabaseConfig mdlConfig = null;
- private static HashMap<String, String> normalize = null;
- private static HashMap<String, String> transformer = null;
- private static HashMap<String, Float> conversion = null;
- private static HashMap<String, String> dbTables = null;
- private HashMap<String, HashMap<String,Integer>> dbSchema = null;
- private static String newSpace="-";
- private static boolean batchMode = true;
-
- /** Creates a new instance of DBWriter */
- public MetricDataLoader() {
- initEnv("");
+ private static Log log = LogFactory.getLog(MetricDataLoader.class);
+ private static Connection conn = null;
+ private static Statement stmt = null;
+ private ResultSet rs = null;
+ private static DatabaseConfig mdlConfig = null;
+ private static HashMap<String, String> normalize = null;
+ private static HashMap<String, String> transformer = null;
+ private static HashMap<String, Float> conversion = null;
+ private static HashMap<String, String> dbTables = null;
+ private HashMap<String, HashMap<String, Integer>> dbSchema = null;
+ private static String newSpace = "-";
+ private static boolean batchMode = true;
+
+ /** Creates a new instance of DBWriter */
+ public MetricDataLoader() {
+ initEnv("");
+ }
+
+ public MetricDataLoader(String cluster) {
+ initEnv(cluster);
+ }
+
+ private void initEnv(String cluster) {
+ mdlConfig = new DatabaseConfig();
+ transformer = mdlConfig.startWith("metric.");
+ conversion = new HashMap<String, Float>();
+ normalize = mdlConfig.startWith("normalize.");
+ dbTables = mdlConfig.startWith("report.db.name.");
+ Iterator<?> entries = mdlConfig.iterator();
+ while (entries.hasNext()) {
+ String entry = entries.next().toString();
+ if (entry.startsWith("conversion.")) {
+ String[] metrics = entry.split("=");
+ try {
+ float convertNumber = Float.parseFloat(metrics[1]);
+ conversion.put(metrics[0], convertNumber);
+ } catch (NumberFormatException ex) {
+ log.error(metrics[0] + " is not a number.");
+ }
+ }
}
-
- public MetricDataLoader(String cluster) {
- initEnv(cluster);
+ String jdbc_url = "";
+ log.debug("cluster name:" + cluster);
+ if (!cluster.equals("")) {
+ ClusterConfig cc = new ClusterConfig();
+ jdbc_url = cc.getURL(cluster);
}
-
- private void initEnv(String cluster){
- mdlConfig = new DatabaseConfig();
- transformer = mdlConfig.startWith("metric.");
- conversion = new HashMap<String, Float>();
- normalize = mdlConfig.startWith("normalize.");
- dbTables = mdlConfig.startWith("report.db.name.");
- Iterator<?> entries = mdlConfig.iterator();
- while(entries.hasNext()) {
- String entry = entries.next().toString();
- if(entry.startsWith("conversion.")) {
- String[] metrics = entry.split("=");
- try {
- float convertNumber = Float.parseFloat(metrics[1]);
- conversion.put(metrics[0],convertNumber);
- } catch (NumberFormatException ex) {
- log.error(metrics[0]+" is not a number.");
- }
- }
- }
- String jdbc_url = "";
- log.debug("cluster name:"+cluster);
- if(!cluster.equals("")) {
- ClusterConfig cc = new ClusterConfig();
- jdbc_url = cc.getURL(cluster);
- }
- try {
- // The newInstance() call is a work around for some
- // broken Java implementations
- String jdbcDriver = System.getenv("JDBC_DRIVER");
- Class.forName(jdbcDriver).newInstance();
- log.debug("Initialized JDBC URL: "+jdbc_url);
- } catch (Exception ex) {
- // handle the error
- log.error(ex,ex);
- }
- try {
- conn = DriverManager.getConnection(jdbc_url);
- HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
- Iterator<String> ki = dbNames.keySet().iterator();
- dbSchema = new HashMap<String, HashMap<String,Integer>>();
- DatabaseWriter dbWriter = new DatabaseWriter(cluster);
- while(ki.hasNext()) {
- String table = dbNames.get(ki.next().toString());
- String query = "select * from "+table+"_template limit 1";
- try {
- ResultSet rs = dbWriter.query(query);
- ResultSetMetaData rmeta = rs.getMetaData();
- HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
- for(int i=1;i<=rmeta.getColumnCount();i++) {
- tableSchema.put(rmeta.getColumnName(i),rmeta.getColumnType(i));
- }
- dbSchema.put(table, tableSchema);
- } catch(SQLException ex) {
- log.debug("table: "+table+" template does not exist, MDL will not load data for this table.");
- }
- }
- dbWriter.close();
- } catch (SQLException ex) {
- log.error(ex,ex);
- }
+ try {
+ // The newInstance() call is a work around for some
+ // broken Java implementations
+ String jdbcDriver = System.getenv("JDBC_DRIVER");
+ Class.forName(jdbcDriver).newInstance();
+ log.debug("Initialized JDBC URL: " + jdbc_url);
+ } catch (Exception ex) {
+ // handle the error
+ log.error(ex, ex);
}
-
- public void interrupt() {
+ try {
+ conn = DriverManager.getConnection(jdbc_url);
+ HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
+ Iterator<String> ki = dbNames.keySet().iterator();
+ dbSchema = new HashMap<String, HashMap<String, Integer>>();
+ DatabaseWriter dbWriter = new DatabaseWriter(cluster);
+ while (ki.hasNext()) {
+ String table = dbNames.get(ki.next().toString());
+ String query = "select * from " + table + "_template limit 1";
+ try {
+ ResultSet rs = dbWriter.query(query);
+ ResultSetMetaData rmeta = rs.getMetaData();
+ HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
+ for (int i = 1; i <= rmeta.getColumnCount(); i++) {
+ tableSchema.put(rmeta.getColumnName(i), rmeta.getColumnType(i));
+ }
+ dbSchema.put(table, tableSchema);
+ } catch (SQLException ex) {
+ log
+ .debug("table: "
+ + table
+ + " template does not exist, MDL will not load data for this table.");
+ }
+ }
+ dbWriter.close();
+ } catch (SQLException ex) {
+ log.error(ex, ex);
}
-
- private String escape(String s,String c){
-
- String ns = s.trim();
- Pattern pattern=Pattern.compile(" +");
- Matcher matcher = pattern.matcher(ns);
- String s2= matcher.replaceAll(c);
+ }
- return s2;
+ public void interrupt() {
+ }
-
- }
-
- public void process(Path source) throws IOException, URISyntaxException, SQLException {
-
- System.out.println("Input file:" + source.getName());
+ private String escape(String s, String c) {
- ChukwaConfiguration conf = new ChukwaConfiguration();
- String fsName = conf.get("writer.hdfs.filesystem");
- FileSystem fs = FileSystem.get(new URI(fsName), conf);
-
- SequenceFile.Reader r =
- new SequenceFile.Reader(fs,source, conf);
-
- stmt = conn.createStatement();
- conn.setAutoCommit(false);
-
- ChukwaRecordKey key = new ChukwaRecordKey();
- ChukwaRecord record = new ChukwaRecord();
+ String ns = s.trim();
+ Pattern pattern = Pattern.compile(" +");
+ Matcher matcher = pattern.matcher(ns);
+ String s2 = matcher.replaceAll(c);
+
+ return s2;
+
+ }
+
+ public void process(Path source) throws IOException, URISyntaxException,
+ SQLException {
+
+ System.out.println("Input file:" + source.getName());
+
+ ChukwaConfiguration conf = new ChukwaConfiguration();
+ String fsName = conf.get("writer.hdfs.filesystem");
+ FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+ SequenceFile.Reader r = new SequenceFile.Reader(fs, source, conf);
+
+ stmt = conn.createStatement();
+ conn.setAutoCommit(false);
+
+ ChukwaRecordKey key = new ChukwaRecordKey();
+ ChukwaRecord record = new ChukwaRecord();
+ try {
+ int batch = 0;
+ while (r.next(key, record)) {
+ boolean isSuccessful = true;
+ String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
+ log.debug("Timestamp: " + record.getTime());
+ log.debug("DataType: " + key.getReduceType());
+ log.debug("StreamName: " + source.getName());
+
+ String[] fields = record.getFields();
+ String table = null;
+ String[] priKeys = null;
+ HashMap<String, HashMap<String, String>> hashReport = new HashMap<String, HashMap<String, String>>();
+ String normKey = new String();
+ String node = record.getValue("csource");
+ String recordType = key.getReduceType().toLowerCase();
+ if (dbTables.containsKey("report.db.name." + recordType)) {
+
+ String[] tmp = mdlConfig.findTableName(mdlConfig
+ .get("report.db.name." + recordType), record.getTime(), record
+ .getTime());
+ table = tmp[0];
+ } else {
+ log.debug("report.db.name." + recordType + " does not exist.");
+ continue;
+ }
+ log.debug("table name:" + table);
try {
- int batch=0;
- while (r.next(key, record)) {
- boolean isSuccessful=true;
- String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
- log.debug("Timestamp: " + record.getTime());
- log.debug("DataType: " + key.getReduceType());
- log.debug("StreamName: " + source.getName());
-
- String[] fields = record.getFields();
- String table = null;
- String[] priKeys = null;
- HashMap<String, HashMap<String, String>> hashReport = new HashMap<String ,HashMap<String, String>>();
- String normKey = new String();
- String node = record.getValue("csource");
- String recordType = key.getReduceType().toLowerCase();
- if(dbTables.containsKey("report.db.name."+recordType)) {
-
- String[] tmp = mdlConfig.findTableName(mdlConfig.get("report.db.name."+recordType), record.getTime(), record.getTime());
- table = tmp[0];
- } else {
- log.debug("report.db.name."+recordType+" does not exist.");
- continue;
- }
- log.debug("table name:"+table);
- try {
- priKeys = mdlConfig.get("report.db.primary.key."+recordType).split(",");
- } catch (Exception nullException) {
- }
- for (String field: fields) {
- String keyName = escape(field.toLowerCase(),newSpace);
- String keyValue = escape(record.getValue(field).toLowerCase(),newSpace);
- if(normalize.containsKey("normalize." + recordType + "." + keyName)) {
- if(normKey.equals("")) {
- normKey = keyName + "." + keyValue;
- } else {
- normKey = normKey + "." + keyName + "." + keyValue;
- }
- }
- String normalizedKey = "metric." + recordType + "." + normKey;
- if(hashReport.containsKey(node)) {
- HashMap<String, String> tmpHash = hashReport.get(node);
- tmpHash.put(normalizedKey, keyValue);
- hashReport.put(node, tmpHash);
- } else {
- HashMap<String, String> tmpHash = new HashMap<String, String>();
- tmpHash.put(normalizedKey, keyValue);
- hashReport.put(node, tmpHash);
- }
- }
- for (String field: fields){
- String valueName=escape(field.toLowerCase(),newSpace);
- String valueValue=escape(record.getValue(field).toLowerCase(),newSpace);
- String normalizedKey = "metric." + recordType + "." + valueName;
- if(!normKey.equals("")) {
- normalizedKey = "metric." + recordType + "." + normKey + "." + valueName;
- }
- if(hashReport.containsKey(node)) {
- HashMap<String, String> tmpHash = hashReport.get(node);
- tmpHash.put(normalizedKey, valueValue);
- hashReport.put(node, tmpHash);
- } else {
- HashMap<String, String> tmpHash = new HashMap<String, String>();
- tmpHash.put(normalizedKey, valueValue);
- hashReport.put(node, tmpHash);
-
- }
-
- }
- Iterator<String> i = hashReport.keySet().iterator();
- while(i.hasNext()) {
- long currentTimeMillis = System.currentTimeMillis();
- Object iteratorNode = i.next();
- HashMap<String, String> recordSet = hashReport.get(iteratorNode);
- Iterator<String> fi = recordSet.keySet().iterator();
- // Map any primary key that was not included in the report keyName
- String sqlPriKeys = "";
- try {
- for (String priKey : priKeys) {
- if(priKey.equals("timestamp")) {
- sqlPriKeys = sqlPriKeys + priKey + " = \"" + sqlTime +"\"";
- }
- if(!priKey.equals(priKeys[priKeys.length-1])) {
- sqlPriKeys = sqlPriKeys + ", ";
- }
- }
- } catch (Exception nullException) {
- // ignore if primary key is empty
- }
- // Map the hash objects to database table columns
- String sqlValues = "";
- boolean firstValue=true;
- while(fi.hasNext()) {
- String fieldKey = (String) fi.next();
- if(transformer.containsKey(fieldKey)) {
- if(!firstValue) {
- sqlValues=sqlValues+", ";
- }
- try {
- if(dbSchema.get(dbTables.get("report.db.name."+recordType)).get(transformer.get(fieldKey))== java.sql.Types.VARCHAR||
- dbSchema.get(dbTables.get("report.db.name."+recordType)).get(transformer.get(fieldKey))== java.sql.Types.BLOB) {
- if(conversion.containsKey("conversion."+fieldKey)) {
- sqlValues = sqlValues + transformer.get(fieldKey) + "=" + recordSet.get(fieldKey) + conversion.get("conversion."+fieldKey).toString();
- } else {
- sqlValues = sqlValues + transformer.get(fieldKey) + "=\"" + recordSet.get(fieldKey)+"\"";
- }
- } else {
- double tmp;
- tmp=Double.parseDouble(recordSet.get(fieldKey).toString());
- if(conversion.containsKey("conversion."+fieldKey)) {
- tmp=tmp*Double.parseDouble(conversion.get("conversion."+fieldKey).toString());
- }
- if(Double.isNaN(tmp)) {
- tmp=0;
- }
- sqlValues = sqlValues + transformer.get(fieldKey) + "=" + tmp;
- }
- firstValue=false;
- } catch (NumberFormatException ex) {
- if(conversion.containsKey("conversion."+fieldKey)) {
- sqlValues = sqlValues + transformer.get(fieldKey) + "=" + recordSet.get(fieldKey) + conversion.get("conversion."+fieldKey).toString();
- } else {
- sqlValues = sqlValues + transformer.get(fieldKey) + "=\"" + recordSet.get(fieldKey)+"\"";
- }
- firstValue=false;
- }
- }
- }
-
- String sql = null;
- if(sqlPriKeys.length()>0) {
- sql = "INSERT INTO " + table + " SET " + sqlPriKeys + "," + sqlValues +
- " ON DUPLICATE KEY UPDATE " + sqlPriKeys + "," + sqlValues + ";";
- } else {
- sql = "INSERT INTO " + table + " SET " + sqlValues +
- " ON DUPLICATE KEY UPDATE " + sqlValues + ";";
- }
- log.debug(sql);
- if(batchMode) {
- stmt.addBatch(sql);
- batch++;
- } else {
- stmt.execute(sql);
- }
- String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
- long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
- int latencySeconds = ((int)(latencyMillis + 500)) / 1000;
- if(batchMode && batch>20000) {
- int[] updateCounts = stmt.executeBatch();
- batch=0;
- }
- log.debug(logMsg + " (" + recordType + "," + RecordUtil.getClusterName(record) +
- "," + record.getTime() +
- ") " + latencySeconds + " sec");
- }
-
- }
- if(batchMode) {
- int[] updateCounts = stmt.executeBatch();
- }
- } catch (SQLException ex) {
- // handle any errors
- log.error(ex, ex);
- log.error("SQLException: " + ex.getMessage());
- log.error("SQLState: " + ex.getSQLState());
- log.error("VendorError: " + ex.getErrorCode());
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if (rs != null) {
- try {
- rs.close();
- } catch (SQLException sqlEx) {
- // ignore
- }
- rs = null;
+ priKeys = mdlConfig.get("report.db.primary.key." + recordType).split(
+ ",");
+ } catch (Exception nullException) {
+ }
+ for (String field : fields) {
+ String keyName = escape(field.toLowerCase(), newSpace);
+ String keyValue = escape(record.getValue(field).toLowerCase(),
+ newSpace);
+ if (normalize.containsKey("normalize." + recordType + "." + keyName)) {
+ if (normKey.equals("")) {
+ normKey = keyName + "." + keyValue;
+ } else {
+ normKey = normKey + "." + keyName + "." + keyValue;
}
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException sqlEx) {
- // ignore
+ }
+ String normalizedKey = "metric." + recordType + "." + normKey;
+ if (hashReport.containsKey(node)) {
+ HashMap<String, String> tmpHash = hashReport.get(node);
+ tmpHash.put(normalizedKey, keyValue);
+ hashReport.put(node, tmpHash);
+ } else {
+ HashMap<String, String> tmpHash = new HashMap<String, String>();
+ tmpHash.put(normalizedKey, keyValue);
+ hashReport.put(node, tmpHash);
+ }
+ }
+ for (String field : fields) {
+ String valueName = escape(field.toLowerCase(), newSpace);
+ String valueValue = escape(record.getValue(field).toLowerCase(),
+ newSpace);
+ String normalizedKey = "metric." + recordType + "." + valueName;
+ if (!normKey.equals("")) {
+ normalizedKey = "metric." + recordType + "." + normKey + "."
+ + valueName;
+ }
+ if (hashReport.containsKey(node)) {
+ HashMap<String, String> tmpHash = hashReport.get(node);
+ tmpHash.put(normalizedKey, valueValue);
+ hashReport.put(node, tmpHash);
+ } else {
+ HashMap<String, String> tmpHash = new HashMap<String, String>();
+ tmpHash.put(normalizedKey, valueValue);
+ hashReport.put(node, tmpHash);
+
+ }
+
+ }
+ Iterator<String> i = hashReport.keySet().iterator();
+ while (i.hasNext()) {
+ long currentTimeMillis = System.currentTimeMillis();
+ Object iteratorNode = i.next();
+ HashMap<String, String> recordSet = hashReport.get(iteratorNode);
+ Iterator<String> fi = recordSet.keySet().iterator();
+ // Map any primary key that was not included in the report keyName
+ String sqlPriKeys = "";
+ try {
+ for (String priKey : priKeys) {
+ if (priKey.equals("timestamp")) {
+ sqlPriKeys = sqlPriKeys + priKey + " = \"" + sqlTime + "\"";
+ }
+ if (!priKey.equals(priKeys[priKeys.length - 1])) {
+ sqlPriKeys = sqlPriKeys + ", ";
+ }
+ }
+ } catch (Exception nullException) {
+ // ignore if primary key is empty
+ }
+ // Map the hash objects to database table columns
+ String sqlValues = "";
+ boolean firstValue = true;
+ while (fi.hasNext()) {
+ String fieldKey = (String) fi.next();
+ if (transformer.containsKey(fieldKey)) {
+ if (!firstValue) {
+ sqlValues = sqlValues + ", ";
+ }
+ try {
+ if (dbSchema.get(dbTables.get("report.db.name." + recordType))
+ .get(transformer.get(fieldKey)) == java.sql.Types.VARCHAR
+ || dbSchema.get(
+ dbTables.get("report.db.name." + recordType)).get(
+ transformer.get(fieldKey)) == java.sql.Types.BLOB) {
+ if (conversion.containsKey("conversion." + fieldKey)) {
+ sqlValues = sqlValues + transformer.get(fieldKey) + "="
+ + recordSet.get(fieldKey)
+ + conversion.get("conversion." + fieldKey).toString();
+ } else {
+ sqlValues = sqlValues + transformer.get(fieldKey) + "=\""
+ + recordSet.get(fieldKey) + "\"";
+ }
+ } else {
+ double tmp;
+ tmp = Double.parseDouble(recordSet.get(fieldKey).toString());
+ if (conversion.containsKey("conversion." + fieldKey)) {
+ tmp = tmp
+ * Double.parseDouble(conversion.get(
+ "conversion." + fieldKey).toString());
+ }
+ if (Double.isNaN(tmp)) {
+ tmp = 0;
+ }
+ sqlValues = sqlValues + transformer.get(fieldKey) + "=" + tmp;
+ }
+ firstValue = false;
+ } catch (NumberFormatException ex) {
+ if (conversion.containsKey("conversion." + fieldKey)) {
+ sqlValues = sqlValues + transformer.get(fieldKey) + "="
+ + recordSet.get(fieldKey)
+ + conversion.get("conversion." + fieldKey).toString();
+ } else {
+ sqlValues = sqlValues + transformer.get(fieldKey) + "=\""
+ + recordSet.get(fieldKey) + "\"";
}
- stmt = null;
+ firstValue = false;
+ }
}
- }
+ }
+
+ String sql = null;
+ if (sqlPriKeys.length() > 0) {
+ sql = "INSERT INTO " + table + " SET " + sqlPriKeys + ","
+ + sqlValues + " ON DUPLICATE KEY UPDATE " + sqlPriKeys + ","
+ + sqlValues + ";";
+ } else {
+ sql = "INSERT INTO " + table + " SET " + sqlValues
+ + " ON DUPLICATE KEY UPDATE " + sqlValues + ";";
+ }
+ log.debug(sql);
+ if (batchMode) {
+ stmt.addBatch(sql);
+ batch++;
+ } else {
+ stmt.execute(sql);
+ }
+ String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
+ long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
+ int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
+ if (batchMode && batch > 20000) {
+ int[] updateCounts = stmt.executeBatch();
+ batch = 0;
+ }
+ log.debug(logMsg + " (" + recordType + ","
+ + RecordUtil.getClusterName(record) + "," + record.getTime()
+ + ") " + latencySeconds + " sec");
+ }
+
+ }
+ if (batchMode) {
+ int[] updateCounts = stmt.executeBatch();
+ }
+ } catch (SQLException ex) {
+ // handle any errors
+ log.error(ex, ex);
+ log.error("SQLException: " + ex.getMessage());
+ log.error("SQLState: " + ex.getSQLState());
+ log.error("VendorError: " + ex.getErrorCode());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException sqlEx) {
+ // ignore
+ }
+ rs = null;
+ }
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException sqlEx) {
+ // ignore
+ }
+ stmt = null;
+ }
}
-
- public static void main(String[] args) {
- try {
- MetricDataLoader mdl = new MetricDataLoader(args[0]);
- mdl.process(new Path(args[1]));
- } catch(Exception e) {
- e.printStackTrace();
- }
+ }
+
+ public static void main(String[] args) {
+ try {
+ MetricDataLoader mdl = new MetricDataLoader(args[0]);
+ mdl.process(new Path(args[1]));
+ } catch (Exception e) {
+ e.printStackTrace();
}
-
+ }
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -26,20 +26,19 @@
import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
import org.apache.log4j.Logger;
-public class ChukwaRecordOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaRecordKey, ChukwaRecord>
-{
- static Logger log = Logger.getLogger(ChukwaRecordOutputFormat.class);
-
- @Override
- protected String generateFileNameForKeyValue(ChukwaRecordKey key, ChukwaRecord record,
- String name)
- {
- String output = RecordUtil.getClusterName(record)
- + "/" + key.getReduceType()
- + "/" + key.getReduceType() + Util.generateTimeOutput(record.getTime());
+public class ChukwaRecordOutputFormat extends
+ MultipleSequenceFileOutputFormat<ChukwaRecordKey, ChukwaRecord> {
+ static Logger log = Logger.getLogger(ChukwaRecordOutputFormat.class);
- //{log.info("ChukwaOutputFormat.fileName: [" + output +"]");}
-
- return output;
- }
+ @Override
+ protected String generateFileNameForKeyValue(ChukwaRecordKey key,
+ ChukwaRecord record, String name) {
+ String output = RecordUtil.getClusterName(record) + "/"
+ + key.getReduceType() + "/" + key.getReduceType()
+ + Util.generateTimeOutput(record.getTime());
+
+ // {log.info("ChukwaOutputFormat.fileName: [" + output +"]");}
+
+ return output;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,34 @@
package org.apache.hadoop.chukwa.extraction.demux;
+
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.log4j.Logger;
-public class ChukwaRecordPartitioner<K, V>
- implements Partitioner<ChukwaRecordKey, ChukwaRecord>
-{
- static Logger log = Logger.getLogger(ChukwaRecordPartitioner.class);
- public void configure(JobConf arg0)
- {}
-
- public int getPartition(org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey key,
- org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord record, int numReduceTasks)
- {
- if (log.isDebugEnabled())
- {
-
- log.debug("Partitioner key: [" + key.getReduceType()
- + "] - Reducer:"
- + ( (key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks));
- }
- return (key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
+public class ChukwaRecordPartitioner<K, V> implements
+ Partitioner<ChukwaRecordKey, ChukwaRecord> {
+ static Logger log = Logger.getLogger(ChukwaRecordPartitioner.class);
+
+ public void configure(JobConf arg0) {
+ }
+
+ public int getPartition(
+ org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey key,
+ org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord record,
+ int numReduceTasks) {
+ if (log.isDebugEnabled()) {
+
+ log
+ .debug("Partitioner key: ["
+ + key.getReduceType()
+ + "] - Reducer:"
+ + ((key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks));
+ }
+ return (key.getReduceType().hashCode() & Integer.MAX_VALUE)
+ % numReduceTasks;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
package org.apache.hadoop.chukwa.extraction.demux;
+
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
-
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -45,198 +45,195 @@
import org.apache.log4j.Logger;
// TODO do an abstract class for all rolling
-public class DailyChukwaRecordRolling extends Configured implements Tool
-{
- static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
-
- static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
- static ChukwaConfiguration conf = null;
- static FileSystem fs = null;
- static final String HadoopLogDir = "_logs";
- static final String hadoopTempDir = "_temporary";
-
- static boolean rollInSequence = true;
- static boolean deleteRawdata = false;
-
- public static void usage()
- {
- System.err.println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
- System.exit(-1);
- }
-
-
- public static void buildDailyFiles(String chukwaMainRepository, String tempDir,String rollingFolder, int workingDay) throws IOException
- {
- // process
- Path dayPath = new Path(rollingFolder + "/daily/" + workingDay) ;
- FileStatus[] clustersFS = fs.listStatus(dayPath);
- for(FileStatus clusterFs : clustersFS)
- {
- String cluster = clusterFs.getPath().getName();
-
- Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/" + workingDay + "/" + cluster) ;
- FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
- for(FileStatus dataSourceFS : dataSourcesFS)
- {
- String dataSource = dataSourceFS.getPath().getName();
- // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
-
- // put the rotate flag
- fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/rotateDone"));
-
- // rotate
- // Merge
- String[] mergeArgs = new String[5];
- // input
- mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/[0-24]*/*.evt";
- // temp dir
- mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/" + workingDay + "_" + System.currentTimeMillis();
- // final output dir
- mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay ;
- // final output fileName
- mergeArgs[3] = dataSource +"_" + workingDay ;
- // delete rolling directory
- mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster + "/" + dataSource;
-
-
- log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0] );
- log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1] );
- log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2] );
- log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3] );
- log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4] );
-
- RecordMerger merge = new RecordMerger(conf,fs,new DailyChukwaRecordRolling(),mergeArgs,deleteRawdata);
- List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
- if (rollInSequence)
- { merge.run(); }
- else
- {
- allMerge.add(merge);
- merge.start();
- }
-
- // join all Threads
- if (!rollInSequence)
- {
- while(allMerge.size() > 0)
- {
- RecordMerger m = allMerge.remove(0);
- try
- { m.join(); }
- catch (InterruptedException e) {}
- }
- } // End if (!rollInSequence)
-
- // Delete the processed dataSourceFS
- FileUtil.fullyDelete(fs,dataSourceFS.getPath());
-
- } // End for(FileStatus dataSourceFS : dataSourcesFS)
-
- // Delete the processed clusterFs
- FileUtil.fullyDelete(fs,clusterFs.getPath());
-
- } // End for(FileStatus clusterFs : clustersFS)
-
- // Delete the processed dayPath
- FileUtil.fullyDelete(fs,dayPath);
- }
-
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception
- {
- conf = new ChukwaConfiguration();
- String fsName = conf.get("writer.hdfs.filesystem");
- fs = FileSystem.get(new URI(fsName), conf);
-
- // TODO read from config
- String rollingFolder = "/chukwa/rolling/";
- String chukwaMainRepository = "/chukwa/repos/";
- String tempDir = "/chukwa/temp/dailyRolling/";
-
-
- // TODO do a real parameter parsing
- if (args.length != 4)
- { usage(); }
-
- if (!args[0].equalsIgnoreCase("rollInSequence"))
- { usage(); }
-
- if (!args[2].equalsIgnoreCase("deleteRawdata"))
- { usage(); }
-
- if (args[1].equalsIgnoreCase("true"))
- { rollInSequence = true; }
- else
- { rollInSequence = false; }
-
- if (args[3].equalsIgnoreCase("true"))
- { deleteRawdata = true; }
- else
- { deleteRawdata = false; }
-
-
- log.info("rollInSequence: " + rollInSequence);
- log.info("deleteRawdata: " + deleteRawdata);
-
- Calendar calendar = Calendar.getInstance();
- int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
- int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
- log.info("CurrentDay: " + currentDay);
- log.info("currentHour" + currentHour);
-
- Path rootFolder = new Path(rollingFolder + "/daily/") ;
-
- FileStatus[] daysFS = fs.listStatus(rootFolder);
- for(FileStatus dayFS : daysFS)
- {
- try
- {
- int workingDay = Integer.parseInt(dayFS.getPath().getName());
- if ( workingDay < currentDay)
- {
- buildDailyFiles(chukwaMainRepository, tempDir,rollingFolder,workingDay);
- } // End if ( workingDay < currentDay)
- } // End Try workingDay = Integer.parseInt(sdf.format(dayFS.getPath().getName()));
- catch(NumberFormatException e)
- { /* Not a standard Day directory skip */ }
-
- } // for(FileStatus dayFS : daysFS)
- }
-
-
- public int run(String[] args) throws Exception
- {
- JobConf conf = new JobConf(getConf(), DailyChukwaRecordRolling.class);
-
- conf.setJobName("DailyChukwa-Rolling");
- conf.setInputFormat(SequenceFileInputFormat.class);
-
- conf.setMapperClass(IdentityMapper.class);
- conf.setReducerClass(IdentityReducer.class);
-
-
- conf.setOutputKeyClass(ChukwaRecordKey.class);
- conf.setOutputValueClass(ChukwaRecord.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
-
- conf.set("mapred.compress.map.output", "true");
- conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.LzoCodec");
- conf.set("mapred.output.compress", "true");
- conf.set("mapred.output.compression.type", "BLOCK");
-
-
- log.info("DailyChukwaRecordRolling input: " + args[0] );
- log.info("DailyChukwaRecordRolling output: " + args[1] );
-
-
- FileInputFormat.setInputPaths(conf, args[0]);
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
- JobClient.runJob(conf);
- return 0;
- }
-
+public class DailyChukwaRecordRolling extends Configured implements Tool {
+ static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
+
+ static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+ static ChukwaConfiguration conf = null;
+ static FileSystem fs = null;
+ static final String HadoopLogDir = "_logs";
+ static final String hadoopTempDir = "_temporary";
+
+ static boolean rollInSequence = true;
+ static boolean deleteRawdata = false;
+
+ public static void usage() {
+ System.err
+ .println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
+ System.exit(-1);
+ }
+
+ public static void buildDailyFiles(String chukwaMainRepository,
+ String tempDir, String rollingFolder, int workingDay) throws IOException {
+ // process
+ Path dayPath = new Path(rollingFolder + "/daily/" + workingDay);
+ FileStatus[] clustersFS = fs.listStatus(dayPath);
+ for (FileStatus clusterFs : clustersFS) {
+ String cluster = clusterFs.getPath().getName();
+
+ Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/"
+ + workingDay + "/" + cluster);
+ FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
+ for (FileStatus dataSourceFS : dataSourcesFS) {
+ String dataSource = dataSourceFS.getPath().getName();
+ // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
+
+ // put the rotate flag
+ fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
+ + dataSource + "/" + workingDay + "/rotateDone"));
+
+ // rotate
+ // Merge
+ String[] mergeArgs = new String[5];
+ // input
+ mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+ + "/" + workingDay + "/[0-24]*/*.evt";
+ // temp dir
+ mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
+ + workingDay + "_" + System.currentTimeMillis();
+ // final output dir
+ mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+ + "/" + workingDay;
+ // final output fileName
+ mergeArgs[3] = dataSource + "_" + workingDay;
+ // delete rolling directory
+ mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
+ + "/" + dataSource;
+
+ log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]);
+ log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]);
+ log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]);
+ log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]);
+ log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]);
+
+ RecordMerger merge = new RecordMerger(conf, fs,
+ new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
+ List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+ if (rollInSequence) {
+ merge.run();
+ } else {
+ allMerge.add(merge);
+ merge.start();
+ }
+
+ // join all Threads
+ if (!rollInSequence) {
+ while (allMerge.size() > 0) {
+ RecordMerger m = allMerge.remove(0);
+ try {
+ m.join();
+ } catch (InterruptedException e) {
+ }
+ }
+ } // End if (!rollInSequence)
+
+ // Delete the processed dataSourceFS
+ FileUtil.fullyDelete(fs, dataSourceFS.getPath());
+
+ } // End for(FileStatus dataSourceFS : dataSourcesFS)
+
+ // Delete the processed clusterFs
+ FileUtil.fullyDelete(fs, clusterFs.getPath());
+
+ } // End for(FileStatus clusterFs : clustersFS)
+
+ // Delete the processed dayPath
+ FileUtil.fullyDelete(fs, dayPath);
+ }
+
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ conf = new ChukwaConfiguration();
+ String fsName = conf.get("writer.hdfs.filesystem");
+ fs = FileSystem.get(new URI(fsName), conf);
+
+ // TODO read from config
+ String rollingFolder = "/chukwa/rolling/";
+ String chukwaMainRepository = "/chukwa/repos/";
+ String tempDir = "/chukwa/temp/dailyRolling/";
+
+ // TODO do a real parameter parsing
+ if (args.length != 4) {
+ usage();
+ }
+
+ if (!args[0].equalsIgnoreCase("rollInSequence")) {
+ usage();
+ }
+
+ if (!args[2].equalsIgnoreCase("deleteRawdata")) {
+ usage();
+ }
+
+ if (args[1].equalsIgnoreCase("true")) {
+ rollInSequence = true;
+ } else {
+ rollInSequence = false;
+ }
+
+ if (args[3].equalsIgnoreCase("true")) {
+ deleteRawdata = true;
+ } else {
+ deleteRawdata = false;
+ }
+
+ log.info("rollInSequence: " + rollInSequence);
+ log.info("deleteRawdata: " + deleteRawdata);
+
+ Calendar calendar = Calendar.getInstance();
+ int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
+ int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+ log.info("CurrentDay: " + currentDay);
+ log.info("currentHour" + currentHour);
+
+ Path rootFolder = new Path(rollingFolder + "/daily/");
+
+ FileStatus[] daysFS = fs.listStatus(rootFolder);
+ for (FileStatus dayFS : daysFS) {
+ try {
+ int workingDay = Integer.parseInt(dayFS.getPath().getName());
+ if (workingDay < currentDay) {
+ buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
+ workingDay);
+ } // End if ( workingDay < currentDay)
+ } // End Try workingDay =
+ // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
+ catch (NumberFormatException e) { /* Not a standard Day directory skip */
+ }
+
+ } // for(FileStatus dayFS : daysFS)
+ }
+
+ public int run(String[] args) throws Exception {
+ JobConf conf = new JobConf(getConf(), DailyChukwaRecordRolling.class);
+
+ conf.setJobName("DailyChukwa-Rolling");
+ conf.setInputFormat(SequenceFileInputFormat.class);
+
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+
+ conf.setOutputKeyClass(ChukwaRecordKey.class);
+ conf.setOutputValueClass(ChukwaRecord.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ conf.set("mapred.compress.map.output", "true");
+ conf.set("mapred.map.output.compression.codec",
+ "org.apache.hadoop.io.compress.LzoCodec");
+ conf.set("mapred.output.compress", "true");
+ conf.set("mapred.output.compression.type", "BLOCK");
+
+ log.info("DailyChukwaRecordRolling input: " + args[0]);
+ log.info("DailyChukwaRecordRolling output: " + args[1]);
+
+ FileInputFormat.setInputPaths(conf, args[0]);
+ FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
package org.apache.hadoop.chukwa.extraction.demux;
+
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
-
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
@@ -50,164 +50,152 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
-public class Demux extends Configured implements Tool
-{
- static Logger log = Logger.getLogger(Demux.class);
- static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
-
- public static class MapClass extends MapReduceBase implements
- Mapper<ChukwaArchiveKey, ChunkImpl , ChukwaRecordKey, ChukwaRecord>
- {
- JobConf jobConf = null;
-
- @Override
- public void configure(JobConf jobConf)
- {
- super.configure(jobConf);
- this.jobConf = jobConf;
- }
-
- public void map(ChukwaArchiveKey key, ChunkImpl chunk,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
- throws IOException
- {
-
- ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector("DemuxMapOutput",output,reporter);
- try
- {
- long duration = System.currentTimeMillis();
- if (log.isDebugEnabled())
- {
- log.debug("Entry: ["+ chunk.getData() + "] EventType: [" + chunk.getDataType() + "]");
- }
- String processorClass = jobConf.get(chunk.getDataType(),
- "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
-
- if (!processorClass.equalsIgnoreCase("Drop"))
- {
- reporter.incrCounter("DemuxMapInput", "total chunks", 1);
- reporter.incrCounter("DemuxMapInput", chunk.getDataType() + " chunks" , 1);
-
- MapProcessor processor = MapProcessorFactory.getProcessor(processorClass);
- processor.process(key,chunk, chukwaOutputCollector, reporter);
- if (log.isDebugEnabled())
- {
- duration = System.currentTimeMillis() - duration;
- log.debug("Demux:Map dataType:" + chunk.getDataType() +
- " duration:" + duration + " processor:" + processorClass + " recordCount:" + chunk.getRecordOffsets().length );
- }
-
- }
- else
- {
- log.info("action:Demux, dataType:" + chunk.getDataType() +
- " duration:0 processor:Drop recordCount:" + chunk.getRecordOffsets().length );
- }
-
- }
- catch(Exception e)
- {
- log.warn("Exception in Demux:MAP", e);
- e.printStackTrace();
- }
- }
- }
-
- public static class ReduceClass extends MapReduceBase implements
- Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord>
- {
- public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
- OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
- Reporter reporter) throws IOException
- {
- ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector("DemuxReduceOutput",output,reporter);
- try
- {
- long duration = System.currentTimeMillis();
+public class Demux extends Configured implements Tool {
+ static Logger log = Logger.getLogger(Demux.class);
+ static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
+
+ public static class MapClass extends MapReduceBase implements
+ Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
+ JobConf jobConf = null;
+
+ @Override
+ public void configure(JobConf jobConf) {
+ super.configure(jobConf);
+ this.jobConf = jobConf;
+ }
+
+ public void map(ChukwaArchiveKey key, ChunkImpl chunk,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws IOException {
+
+ ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
+ "DemuxMapOutput", output, reporter);
+ try {
+ long duration = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("Entry: [" + chunk.getData() + "] EventType: ["
+ + chunk.getDataType() + "]");
+ }
+ String processorClass = jobConf
+ .get(chunk.getDataType(),
+ "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+
+ if (!processorClass.equalsIgnoreCase("Drop")) {
+ reporter.incrCounter("DemuxMapInput", "total chunks", 1);
+ reporter.incrCounter("DemuxMapInput",
+ chunk.getDataType() + " chunks", 1);
+
+ MapProcessor processor = MapProcessorFactory
+ .getProcessor(processorClass);
+ processor.process(key, chunk, chukwaOutputCollector, reporter);
+ if (log.isDebugEnabled()) {
+ duration = System.currentTimeMillis() - duration;
+ log.debug("Demux:Map dataType:" + chunk.getDataType()
+ + " duration:" + duration + " processor:" + processorClass
+ + " recordCount:" + chunk.getRecordOffsets().length);
+ }
+
+ } else {
+ log.info("action:Demux, dataType:" + chunk.getDataType()
+ + " duration:0 processor:Drop recordCount:"
+ + chunk.getRecordOffsets().length);
+ }
+
+ } catch (Exception e) {
+ log.warn("Exception in Demux:MAP", e);
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static class ReduceClass extends MapReduceBase implements
+ Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
+ public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+ OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+ throws IOException {
+ ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
+ "DemuxReduceOutput", output, reporter);
+ try {
+ long duration = System.currentTimeMillis();
reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
- reporter.incrCounter("DemuxReduceInput", key.getReduceType() +" total distinct keys" , 1);
-
- ReduceProcessorFactory.getProcessor(key.getReduceType()).process(key,values, chukwaOutputCollector, reporter);
-
- if (log.isDebugEnabled())
- {
- duration = System.currentTimeMillis() - duration;
- log.debug("Demux:Reduce, dataType:" + key.getReduceType() +" duration:" + duration);
- }
-
- }
- catch(Exception e)
- {
- log.warn("Exception in Demux:Reduce", e);
- e.printStackTrace();
- }
- }
- }
-
- static int printUsage() {
- System.out
- .println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
- ToolRunner.printGenericCommandUsage(System.out);
- return -1;
- }
-
- public int run(String[] args) throws Exception
- {
- JobConf conf = new JobConf(getConf(), Demux.class);
- conf.addResource(new Path("conf/chukwa-demux-conf.xml"));
-
- conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
- conf.setInputFormat(SequenceFileInputFormat.class);
- conf.setMapperClass(Demux.MapClass.class);
- conf.setPartitionerClass(ChukwaRecordPartitioner.class);
- conf.setReducerClass(Demux.ReduceClass.class);
-
- conf.setOutputKeyClass(ChukwaRecordKey.class);
- conf.setOutputValueClass(ChukwaRecord.class);
- conf.setOutputFormat(ChukwaRecordOutputFormat.class);
-
-// conf.setCompressMapOutput(true);
- // conf.setMapOutputCompressorClass(LzoCodec.class);
-
-
- List<String> other_args = new ArrayList<String>();
- for (int i = 0; i < args.length; ++i) {
- try {
- if ("-m".equals(args[i])) {
- conf.setNumMapTasks(Integer.parseInt(args[++i]));
- } else if ("-r".equals(args[i])) {
- conf.setNumReduceTasks(Integer.parseInt(args[++i]));
- } else {
- other_args.add(args[i]);
- }
- } catch (NumberFormatException except) {
- System.out.println("ERROR: Integer expected instead of "
- + args[i]);
- return printUsage();
- } catch (ArrayIndexOutOfBoundsException except) {
- System.out.println("ERROR: Required parameter missing from "
- + args[i - 1]);
- return printUsage();
- }
- }
- // Make sure there are exactly 2 parameters left.
- if (other_args.size() != 2) {
- System.out.println("ERROR: Wrong number of parameters: "
- + other_args.size() + " instead of 2.");
- return printUsage();
- }
-
- FileInputFormat.setInputPaths(conf, other_args.get(0));
- FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
-
- JobClient.runJob(conf);
- return 0;
- }
-
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(),
- new Demux(), args);
- System.exit(res);
- }
+ reporter.incrCounter("DemuxReduceInput", key.getReduceType()
+ + " total distinct keys", 1);
+
+ ReduceProcessorFactory.getProcessor(key.getReduceType()).process(key,
+ values, chukwaOutputCollector, reporter);
+
+ if (log.isDebugEnabled()) {
+ duration = System.currentTimeMillis() - duration;
+ log.debug("Demux:Reduce, dataType:" + key.getReduceType()
+ + " duration:" + duration);
+ }
+
+ } catch (Exception e) {
+ log.warn("Exception in Demux:Reduce", e);
+ e.printStackTrace();
+ }
+ }
+ }
+
+ static int printUsage() {
+ System.out.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
+ ToolRunner.printGenericCommandUsage(System.out);
+ return -1;
+ }
+
+ public int run(String[] args) throws Exception {
+ JobConf conf = new JobConf(getConf(), Demux.class);
+ conf.addResource(new Path("conf/chukwa-demux-conf.xml"));
+
+ conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
+ conf.setInputFormat(SequenceFileInputFormat.class);
+ conf.setMapperClass(Demux.MapClass.class);
+ conf.setPartitionerClass(ChukwaRecordPartitioner.class);
+ conf.setReducerClass(Demux.ReduceClass.class);
+
+ conf.setOutputKeyClass(ChukwaRecordKey.class);
+ conf.setOutputValueClass(ChukwaRecord.class);
+ conf.setOutputFormat(ChukwaRecordOutputFormat.class);
+
+ // conf.setCompressMapOutput(true);
+ // conf.setMapOutputCompressorClass(LzoCodec.class);
+
+ List<String> other_args = new ArrayList<String>();
+ for (int i = 0; i < args.length; ++i) {
+ try {
+ if ("-m".equals(args[i])) {
+ conf.setNumMapTasks(Integer.parseInt(args[++i]));
+ } else if ("-r".equals(args[i])) {
+ conf.setNumReduceTasks(Integer.parseInt(args[++i]));
+ } else {
+ other_args.add(args[i]);
+ }
+ } catch (NumberFormatException except) {
+ System.out.println("ERROR: Integer expected instead of " + args[i]);
+ return printUsage();
+ } catch (ArrayIndexOutOfBoundsException except) {
+ System.out.println("ERROR: Required parameter missing from "
+ + args[i - 1]);
+ return printUsage();
+ }
+ }
+ // Make sure there are exactly 2 parameters left.
+ if (other_args.size() != 2) {
+ System.out.println("ERROR: Wrong number of parameters: "
+ + other_args.size() + " instead of 2.");
+ return printUsage();
+ }
+
+ FileInputFormat.setInputPaths(conf, other_args.get(0));
+ FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(new Configuration(), new Demux(), args);
+ System.exit(res);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
package org.apache.hadoop.chukwa.extraction.demux;
+
import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
-
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -45,210 +45,213 @@
import org.apache.log4j.Logger;
// TODO do an abstract class for all rolling
-public class HourlyChukwaRecordRolling extends Configured implements Tool
-{
- static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
-
- static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
- static ChukwaConfiguration conf = null;
- static FileSystem fs = null;
- static final String HadoopLogDir = "_logs";
- static final String hadoopTempDir = "_temporary";
-
- static boolean rollInSequence = true;
- static boolean deleteRawdata = false;
-
- public static void usage()
- {
- System.err.println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
- System.exit(-1);
- }
-
-
- public static void buildHourlyFiles(String chukwaMainRepository, String tempDir,String rollingFolder, int workingDay, int workingHour) throws IOException
- {
- // process
- Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/" + workingHour) ;
- FileStatus[] clustersFS = fs.listStatus(hourPath);
- for(FileStatus clusterFs : clustersFS)
- {
- String cluster = clusterFs.getPath().getName();
-
- Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/" + workingDay + "/" + workingHour + "/" + cluster) ;
- FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
- for(FileStatus dataSourceFS : dataSourcesFS)
- {
- String dataSource = dataSourceFS.getPath().getName();
- // Repo path = reposRootDirectory/<cluster>/<day>/<hour>/*/*.evt
-
- // put the rotate flag
- fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour + "/rotateDone"));
-
- // rotate
- // Merge
- String[] mergeArgs = new String[5];
- // input
- mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
- // temp dir
- mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour + "_" + System.currentTimeMillis() ;
- // final output dir
- mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour ;
- // final output fileName
- mergeArgs[3] = dataSource +"_" + workingDay +"_" + workingHour;
- // delete rolling directory
- mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/" + workingHour + "/" + cluster + "/" + dataSource;
-
-
- log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0] );
- log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1] );
- log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2] );
- log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3] );
- log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4] );
-
- RecordMerger merge = new RecordMerger(conf,fs,new HourlyChukwaRecordRolling(), mergeArgs,deleteRawdata);
- List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
- if (rollInSequence)
- { merge.run(); }
- else
- {
- allMerge.add(merge);
- merge.start();
- }
-
- // join all Threads
- if (!rollInSequence)
- {
- while(allMerge.size() > 0)
- {
- RecordMerger m = allMerge.remove(0);
- try
- { m.join(); }
- catch (InterruptedException e) {}
- }
- } // End if (!rollInSequence)
-
- // Delete the processed dataSourceFS
- FileUtil.fullyDelete(fs,dataSourceFS.getPath());
-
- } // End for(FileStatus dataSourceFS : dataSourcesFS)
-
- // Delete the processed clusterFs
- FileUtil.fullyDelete(fs,clusterFs.getPath());
-
- } // End for(FileStatus clusterFs : clustersFS)
-
- // Delete the processed hour
- FileUtil.fullyDelete(fs,hourPath);
- }
-
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception
- {
- conf = new ChukwaConfiguration();
- String fsName = conf.get("writer.hdfs.filesystem");
- fs = FileSystem.get(new URI(fsName), conf);
-
- // TODO read from config
- String rollingFolder = "/chukwa/rolling/";
- String chukwaMainRepository = "/chukwa/repos/";
- String tempDir = "/chukwa/temp/hourlyRolling/";
-
-
- // TODO do a real parameter parsing
- if (args.length != 4)
- { usage(); }
-
- if (!args[0].equalsIgnoreCase("rollInSequence"))
- { usage(); }
-
- if (!args[2].equalsIgnoreCase("deleteRawdata"))
- { usage(); }
-
- if (args[1].equalsIgnoreCase("true"))
- { rollInSequence = true; }
- else
- { rollInSequence = false; }
-
- if (args[3].equalsIgnoreCase("true"))
- { deleteRawdata = true; }
- else
- { deleteRawdata = false; }
-
-
-
- Calendar calendar = Calendar.getInstance();
- int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
- int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
- log.info("CurrentDay: " + currentDay);
- log.info("currentHour" + currentHour);
-
- Path rootFolder = new Path(rollingFolder + "/hourly/") ;
-
- FileStatus[] daysFS = fs.listStatus(rootFolder);
- for(FileStatus dayFS : daysFS)
- {
- try
- {
- log.info("dayFs:" + dayFS.getPath().getName());
- int workingDay = Integer.parseInt(dayFS.getPath().getName());
-
- Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay) ;
- FileStatus[] hoursFS = fs.listStatus(hourlySrc);
- for(FileStatus hourFS : hoursFS)
- {
- String workinhHourStr = hourFS.getPath().getName();
- int workingHour = Integer.parseInt(workinhHourStr);
- if (
- (workingDay < currentDay) || // all previous days
- ( (workingDay == currentDay) && (workingHour < currentHour) ) // Up to the last hour
- )
- {
-
- buildHourlyFiles(chukwaMainRepository,tempDir,rollingFolder, workingDay,workingHour);
-
- } // End if ( (workingDay < currentDay) || ( (workingDay == currentDay) && (intHour < currentHour) ) )
- } // End for(FileStatus hourFS : hoursFS)
- } // End Try workingDay = Integer.parseInt(sdf.format(dayFS.getPath().getName()));
- catch(NumberFormatException e)
- { /* Not a standard Day directory skip */ }
-
- } // for(FileStatus dayFS : daysFS)
- }
-
-
- public int run(String[] args) throws Exception
- {
- JobConf conf = new JobConf(getConf(), HourlyChukwaRecordRolling.class);
-
- conf.setJobName("HourlyChukwa-Rolling");
- conf.setInputFormat(SequenceFileInputFormat.class);
-
- conf.setMapperClass(IdentityMapper.class);
- conf.setReducerClass(IdentityReducer.class);
-
-
- conf.setOutputKeyClass(ChukwaRecordKey.class);
- conf.setOutputValueClass(ChukwaRecord.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
-
- conf.set("mapred.compress.map.output", "true");
- conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.LzoCodec");
- conf.set("mapred.output.compress", "true");
- conf.set("mapred.output.compression.type", "BLOCK");
-
-
- log.info("HourlyChukwaRecordRolling input: " + args[0] );
- log.info("HourlyChukwaRecordRolling output: " + args[1] );
-
-
- FileInputFormat.setInputPaths(conf, args[0]);
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
- JobClient.runJob(conf);
- return 0;
- }
-
+public class HourlyChukwaRecordRolling extends Configured implements Tool {
+ static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
+
+ static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+ static ChukwaConfiguration conf = null;
+ static FileSystem fs = null;
+ static final String HadoopLogDir = "_logs";
+ static final String hadoopTempDir = "_temporary";
+
+ static boolean rollInSequence = true;
+ static boolean deleteRawdata = false;
+
+ public static void usage() {
+ System.err
+ .println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
+ System.exit(-1);
+ }
+
+ public static void buildHourlyFiles(String chukwaMainRepository,
+ String tempDir, String rollingFolder, int workingDay, int workingHour)
+ throws IOException {
+ // process
+ Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/"
+ + workingHour);
+ FileStatus[] clustersFS = fs.listStatus(hourPath);
+ for (FileStatus clusterFs : clustersFS) {
+ String cluster = clusterFs.getPath().getName();
+
+ Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/"
+ + workingDay + "/" + workingHour + "/" + cluster);
+ FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
+ for (FileStatus dataSourceFS : dataSourcesFS) {
+ String dataSource = dataSourceFS.getPath().getName();
+ // Repo path = reposRootDirectory/<cluster>/<day>/<hour>/*/*.evt
+
+ // put the rotate flag
+ fs
+ .mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
+ + dataSource + "/" + workingDay + "/" + workingHour
+ + "/rotateDone"));
+
+ // rotate
+ // Merge
+ String[] mergeArgs = new String[5];
+ // input
+ mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+ + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
+ // temp dir
+ mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
+ + workingDay + "/" + workingHour + "_" + System.currentTimeMillis();
+ // final output dir
+ mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+ + "/" + workingDay + "/" + workingHour;
+ // final output fileName
+ mergeArgs[3] = dataSource + "_" + workingDay + "_" + workingHour;
+ // delete rolling directory
+ mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
+ + workingHour + "/" + cluster + "/" + dataSource;
+
+ log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]);
+ log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]);
+ log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]);
+ log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]);
+ log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]);
+
+ RecordMerger merge = new RecordMerger(conf, fs,
+ new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
+ List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+ if (rollInSequence) {
+ merge.run();
+ } else {
+ allMerge.add(merge);
+ merge.start();
+ }
+
+ // join all Threads
+ if (!rollInSequence) {
+ while (allMerge.size() > 0) {
+ RecordMerger m = allMerge.remove(0);
+ try {
+ m.join();
+ } catch (InterruptedException e) {
+ }
+ }
+ } // End if (!rollInSequence)
+
+ // Delete the processed dataSourceFS
+ FileUtil.fullyDelete(fs, dataSourceFS.getPath());
+
+ } // End for(FileStatus dataSourceFS : dataSourcesFS)
+
+ // Delete the processed clusterFs
+ FileUtil.fullyDelete(fs, clusterFs.getPath());
+
+ } // End for(FileStatus clusterFs : clustersFS)
+
+ // Delete the processed hour
+ FileUtil.fullyDelete(fs, hourPath);
+ }
+
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ conf = new ChukwaConfiguration();
+ String fsName = conf.get("writer.hdfs.filesystem");
+ fs = FileSystem.get(new URI(fsName), conf);
+
+ // TODO read from config
+ String rollingFolder = "/chukwa/rolling/";
+ String chukwaMainRepository = "/chukwa/repos/";
+ String tempDir = "/chukwa/temp/hourlyRolling/";
+
+ // TODO do a real parameter parsing
+ if (args.length != 4) {
+ usage();
+ }
+
+ if (!args[0].equalsIgnoreCase("rollInSequence")) {
+ usage();
+ }
+
+ if (!args[2].equalsIgnoreCase("deleteRawdata")) {
+ usage();
+ }
+
+ if (args[1].equalsIgnoreCase("true")) {
+ rollInSequence = true;
+ } else {
+ rollInSequence = false;
+ }
+
+ if (args[3].equalsIgnoreCase("true")) {
+ deleteRawdata = true;
+ } else {
+ deleteRawdata = false;
+ }
+
+ Calendar calendar = Calendar.getInstance();
+ int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
+ int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+ log.info("CurrentDay: " + currentDay);
+ log.info("currentHour" + currentHour);
+
+ Path rootFolder = new Path(rollingFolder + "/hourly/");
+
+ FileStatus[] daysFS = fs.listStatus(rootFolder);
+ for (FileStatus dayFS : daysFS) {
+ try {
+ log.info("dayFs:" + dayFS.getPath().getName());
+ int workingDay = Integer.parseInt(dayFS.getPath().getName());
+
+ Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay);
+ FileStatus[] hoursFS = fs.listStatus(hourlySrc);
+ for (FileStatus hourFS : hoursFS) {
+ String workinhHourStr = hourFS.getPath().getName();
+ int workingHour = Integer.parseInt(workinhHourStr);
+ if ((workingDay < currentDay) || // all previous days
+ ((workingDay == currentDay) && (workingHour < currentHour)) // Up
+ // to
+ // the
+ // last
+ // hour
+ ) {
+
+ buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
+ workingDay, workingHour);
+
+ } // End if ( (workingDay < currentDay) || ( (workingDay ==
+ // currentDay) && (intHour < currentHour) ) )
+ } // End for(FileStatus hourFS : hoursFS)
+ } // End Try workingDay =
+ // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
+ catch (NumberFormatException e) { /* Not a standard Day directory skip */
+ }
+
+ } // for(FileStatus dayFS : daysFS)
+ }
+
+ public int run(String[] args) throws Exception {
+ JobConf conf = new JobConf(getConf(), HourlyChukwaRecordRolling.class);
+
+ conf.setJobName("HourlyChukwa-Rolling");
+ conf.setInputFormat(SequenceFileInputFormat.class);
+
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+
+ conf.setOutputKeyClass(ChukwaRecordKey.class);
+ conf.setOutputValueClass(ChukwaRecord.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ conf.set("mapred.compress.map.output", "true");
+ conf.set("mapred.map.output.compression.codec",
+ "org.apache.hadoop.io.compress.LzoCodec");
+ conf.set("mapred.output.compress", "true");
+ conf.set("mapred.output.compression.type", "BLOCK");
+
+ log.info("HourlyChukwaRecordRolling input: " + args[0]);
+ log.info("HourlyChukwaRecordRolling output: " + args[1]);
+
+ FileInputFormat.setInputPaths(conf, args[0]);
+ FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
package org.apache.hadoop.chukwa.extraction.demux;
+
import java.io.IOException;
import java.net.URI;
-
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -41,198 +41,184 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-public class MoveOrMergeRecordFile extends Configured implements Tool
-{
- static ChukwaConfiguration conf = null;
- static FileSystem fs = null;
- static final String HadoopLogDir = "_logs";
- static final String hadoopTempDir = "_temporary";
-
- public int run(String[] args) throws Exception
- {
- JobConf conf = new JobConf(getConf(), MoveOrMergeRecordFile.class);
-
- conf.setJobName("Chukwa-MoveOrMergeLogFile");
- conf.setInputFormat(SequenceFileInputFormat.class);
-
- conf.setMapperClass(IdentityMapper.class);
- conf.setReducerClass(IdentityReducer.class);
-
- //conf.setPartitionerClass(ChukwaPartitioner.class);
- //conf.setOutputFormat(ChukwaOutputFormat.class);
-
- conf.setOutputKeyClass(ChukwaRecordKey.class);
- conf.setOutputValueClass(ChukwaRecord.class);
- conf.setOutputFormat(SequenceFileOutputFormat.class);
-
- FileInputFormat.setInputPaths(conf, args[0]);
- FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
- JobClient.runJob(conf);
- return 0;
- }
-
-
- static void moveOrMergeOneCluster(Path srcDir,String destDir) throws Exception
- {
- System.out.println("moveOrMergeOneCluster (" + srcDir.getName() + "," + destDir +")");
- FileStatus fstat = fs.getFileStatus(srcDir);
-
- if (!fstat.isDir())
- {
- throw new IOException(srcDir + " is not a directory!");
- }
- else
- {
- FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
- for(FileStatus datasourceDirectory : datasourceDirectories)
- {
- System.out.println(datasourceDirectory.getPath() + " isDir?" +datasourceDirectory.isDir());
- if (!datasourceDirectory.isDir())
- {
- throw new IOException("Top level should just contains directories :" + datasourceDirectory.getPath());
- }
-
- String dirName = datasourceDirectory.getPath().getName();
-
- Path destPath = new Path(destDir + "/" + dirName);
- System.out.println("dest directory path: " + destPath);
-
- if (!fs.exists(destPath))
- {
- System.out.println("create datasource directory [" + destDir + "/" + dirName + "]");
- fs.mkdirs(destPath);
- }
-
- FileStatus[] evts = fs.listStatus(datasourceDirectory.getPath(),new EventFileFilter());
- for(FileStatus eventFile : evts)
- {
-
- Path eventFilePath = eventFile.getPath();
- String filename = eventFilePath.getName();
- System.out.println("src dir File: ["+ filename+"]");
- Path destFilePath = new Path(destDir + "/" + dirName + "/" + filename);
- if (!fs.exists(destFilePath))
- {
- System.out.println("Moving File: [" + destFilePath +"]");
- // Copy to final Location
- FileUtil.copy(fs,eventFilePath,fs,destFilePath,false,false,conf);
- }
- else
- {
- System.out.println("Need to merge! : [" + destFilePath +"]");
- String strMrPath = datasourceDirectory.getPath().toString()+ "/" + "MR_" + System.currentTimeMillis();
- Path mrPath = new Path(strMrPath);
- System.out.println("\t New MR directory : [" + mrPath +"]");
- // Create MR input Dir
- fs.mkdirs(mrPath);
- // Move Input files
- FileUtil.copy(fs,eventFilePath,fs,new Path(strMrPath+"/1.evt"),false,false,conf);
- fs.rename(destFilePath, new Path(strMrPath+"/2.evt"));
-
- // Merge
- String[] mergeArgs = new String[2];
- mergeArgs[0] = strMrPath;
- mergeArgs[1] = strMrPath + "/mrOutput";
- DoMerge merge = new DoMerge(conf,fs,eventFilePath,destFilePath,mergeArgs);
- merge.start();
- }
- }
- }
- }
-
- }
-
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception
- {
- conf = new ChukwaConfiguration();
- String fsName = conf.get("writer.hdfs.filesystem");
- fs = FileSystem.get(new URI(fsName), conf);
-
- Path srcDir = new Path(args[0]);
- String destDir = args[1];
-
-
- FileStatus fstat = fs.getFileStatus(srcDir);
-
- if (!fstat.isDir())
- {
- throw new IOException(srcDir + " is not a directory!");
- }
- else
- {
- FileStatus[] clusters = fs.listStatus(srcDir);
- // Run a moveOrMerge on all clusters
- String name = null;
- for(FileStatus cluster : clusters)
- {
- name = cluster.getPath().getName();
- // Skip hadoop outDir
- if ( (name.intern() == HadoopLogDir.intern() ) || (name.intern() == hadoopTempDir.intern()) )
- {
- continue;
- }
- moveOrMergeOneCluster(cluster.getPath(),destDir + "/" + cluster.getPath().getName());
- }
- }
- System.out.println("Done with moveOrMerge main()");
- }
+public class MoveOrMergeRecordFile extends Configured implements Tool {
+ static ChukwaConfiguration conf = null;
+ static FileSystem fs = null;
+ static final String HadoopLogDir = "_logs";
+ static final String hadoopTempDir = "_temporary";
+
+ public int run(String[] args) throws Exception {
+ JobConf conf = new JobConf(getConf(), MoveOrMergeRecordFile.class);
+
+ conf.setJobName("Chukwa-MoveOrMergeLogFile");
+ conf.setInputFormat(SequenceFileInputFormat.class);
+
+ conf.setMapperClass(IdentityMapper.class);
+ conf.setReducerClass(IdentityReducer.class);
+
+ // conf.setPartitionerClass(ChukwaPartitioner.class);
+ // conf.setOutputFormat(ChukwaOutputFormat.class);
+
+ conf.setOutputKeyClass(ChukwaRecordKey.class);
+ conf.setOutputValueClass(ChukwaRecord.class);
+ conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+ FileInputFormat.setInputPaths(conf, args[0]);
+ FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+ JobClient.runJob(conf);
+ return 0;
+ }
+
+ static void moveOrMergeOneCluster(Path srcDir, String destDir)
+ throws Exception {
+ System.out.println("moveOrMergeOneCluster (" + srcDir.getName() + ","
+ + destDir + ")");
+ FileStatus fstat = fs.getFileStatus(srcDir);
+
+ if (!fstat.isDir()) {
+ throw new IOException(srcDir + " is not a directory!");
+ } else {
+ FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
+ for (FileStatus datasourceDirectory : datasourceDirectories) {
+ System.out.println(datasourceDirectory.getPath() + " isDir?"
+ + datasourceDirectory.isDir());
+ if (!datasourceDirectory.isDir()) {
+ throw new IOException("Top level should just contains directories :"
+ + datasourceDirectory.getPath());
+ }
+
+ String dirName = datasourceDirectory.getPath().getName();
+
+ Path destPath = new Path(destDir + "/" + dirName);
+ System.out.println("dest directory path: " + destPath);
+
+ if (!fs.exists(destPath)) {
+ System.out.println("create datasource directory [" + destDir + "/"
+ + dirName + "]");
+ fs.mkdirs(destPath);
+ }
+
+ FileStatus[] evts = fs.listStatus(datasourceDirectory.getPath(),
+ new EventFileFilter());
+ for (FileStatus eventFile : evts) {
+
+ Path eventFilePath = eventFile.getPath();
+ String filename = eventFilePath.getName();
+ System.out.println("src dir File: [" + filename + "]");
+ Path destFilePath = new Path(destDir + "/" + dirName + "/" + filename);
+ if (!fs.exists(destFilePath)) {
+ System.out.println("Moving File: [" + destFilePath + "]");
+ // Copy to final Location
+ FileUtil.copy(fs, eventFilePath, fs, destFilePath, false, false,
+ conf);
+ } else {
+ System.out.println("Need to merge! : [" + destFilePath + "]");
+ String strMrPath = datasourceDirectory.getPath().toString() + "/"
+ + "MR_" + System.currentTimeMillis();
+ Path mrPath = new Path(strMrPath);
+ System.out.println("\t New MR directory : [" + mrPath + "]");
+ // Create MR input Dir
+ fs.mkdirs(mrPath);
+ // Move Input files
+ FileUtil.copy(fs, eventFilePath, fs,
+ new Path(strMrPath + "/1.evt"), false, false, conf);
+ fs.rename(destFilePath, new Path(strMrPath + "/2.evt"));
+
+ // Merge
+ String[] mergeArgs = new String[2];
+ mergeArgs[0] = strMrPath;
+ mergeArgs[1] = strMrPath + "/mrOutput";
+ DoMerge merge = new DoMerge(conf, fs, eventFilePath, destFilePath,
+ mergeArgs);
+ merge.start();
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ conf = new ChukwaConfiguration();
+ String fsName = conf.get("writer.hdfs.filesystem");
+ fs = FileSystem.get(new URI(fsName), conf);
+
+ Path srcDir = new Path(args[0]);
+ String destDir = args[1];
+
+ FileStatus fstat = fs.getFileStatus(srcDir);
+
+ if (!fstat.isDir()) {
+ throw new IOException(srcDir + " is not a directory!");
+ } else {
+ FileStatus[] clusters = fs.listStatus(srcDir);
+ // Run a moveOrMerge on all clusters
+ String name = null;
+ for (FileStatus cluster : clusters) {
+ name = cluster.getPath().getName();
+ // Skip hadoop outDir
+ if ((name.intern() == HadoopLogDir.intern())
+ || (name.intern() == hadoopTempDir.intern())) {
+ continue;
+ }
+ moveOrMergeOneCluster(cluster.getPath(), destDir + "/"
+ + cluster.getPath().getName());
+ }
+ }
+ System.out.println("Done with moveOrMerge main()");
+ }
}
-class DoMerge extends Thread
-{
- ChukwaConfiguration conf = null;
- FileSystem fs = null;
- String[] mergeArgs = new String[2];
- Path destFilePath = null;
- Path eventFilePath = null;
- public DoMerge(ChukwaConfiguration conf,FileSystem fs,
- Path eventFilePath,Path destFilePath,String[] mergeArgs)
- {
- this.conf = conf;
- this.fs = fs;
- this.eventFilePath = eventFilePath;
- this.destFilePath = destFilePath;
- this.mergeArgs = mergeArgs;
- }
- @Override
- public void run()
- {
- System.out.println("\t Running Merge! : output [" + mergeArgs[1] +"]");
- int res;
- try
- {
- res = ToolRunner.run(new ChukwaConfiguration(),new MoveOrMergeRecordFile(), mergeArgs);
- System.out.println("MR exit status: " + res);
- if (res == 0)
- {
- System.out.println("\t Moving output file : to [" + destFilePath +"]");
- FileUtil.copy(fs,new Path(mergeArgs[1]+"/part-00000"),fs,destFilePath,false,false,conf);
- fs.rename(new Path(mergeArgs[1]+"/part-00000"), eventFilePath);
- }
- else
- {
- throw new RuntimeException("Error in M/R merge operation!");
- }
-
- }
- catch (Exception e)
- {
- e.printStackTrace();
- throw new RuntimeException("Error in M/R merge operation!",e);
- }
- }
-
+class DoMerge extends Thread {
+ ChukwaConfiguration conf = null;
+ FileSystem fs = null;
+ String[] mergeArgs = new String[2];
+ Path destFilePath = null;
+ Path eventFilePath = null;
+
+ public DoMerge(ChukwaConfiguration conf, FileSystem fs, Path eventFilePath,
+ Path destFilePath, String[] mergeArgs) {
+ this.conf = conf;
+ this.fs = fs;
+ this.eventFilePath = eventFilePath;
+ this.destFilePath = destFilePath;
+ this.mergeArgs = mergeArgs;
+ }
+
+ @Override
+ public void run() {
+ System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
+ int res;
+ try {
+ res = ToolRunner.run(new ChukwaConfiguration(),
+ new MoveOrMergeRecordFile(), mergeArgs);
+ System.out.println("MR exit status: " + res);
+ if (res == 0) {
+ System.out.println("\t Moving output file : to [" + destFilePath + "]");
+ FileUtil.copy(fs, new Path(mergeArgs[1] + "/part-00000"), fs,
+ destFilePath, false, false, conf);
+ fs.rename(new Path(mergeArgs[1] + "/part-00000"), eventFilePath);
+ } else {
+ throw new RuntimeException("Error in M/R merge operation!");
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("Error in M/R merge operation!", e);
+ }
+ }
+
}
class EventFileFilter implements PathFilter {
- public boolean accept(Path path) {
- return (path.toString().endsWith(".evt"));
- }
- }
+ public boolean accept(Path path) {
+ return (path.toString().endsWith(".evt"));
+ }
+}