You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [2/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoop...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Consolidator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Consolidator.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Consolidator.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Consolidator.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
package org.apache.hadoop.chukwa.database;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
import org.apache.hadoop.chukwa.util.DatabaseWriter;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.chukwa.util.PidFile;
-
import java.sql.SQLException;
import java.sql.ResultSet;
import java.util.Calendar;
@@ -34,223 +34,233 @@
import java.text.SimpleDateFormat;
public class Consolidator extends Thread {
- private DatabaseConfig dbc = new DatabaseConfig();
+ private DatabaseConfig dbc = new DatabaseConfig();
- private static Log log = LogFactory.getLog(Consolidator.class);
- private String table = null;
- private int[] intervals;
- private static PidFile loader=null;
-
- public Consolidator(String table, String intervalString) {
- super(table);
- try {
- int i=0;
- String[] temp = intervalString.split("\\,");
- intervals = new int[temp.length];
- for(String s: temp) {
- intervals[i]=Integer.parseInt(s);
- i++;
- }
- this.table = table;
- } catch (NumberFormatException ex) {
- log.error("Unable to parse summary interval");
- }
- }
- public void run() {
- ResultSet rs = null;
- String[] columns;
- int[] columnsType;
- String groupBy = "";
-
- for(int interval : intervals) {
- // Start reducing from beginning of time;
- Calendar aYearAgo = Calendar.getInstance();
- aYearAgo.set(2008, 1, 1, 0, 0, 0);
-
- long start = aYearAgo.getTimeInMillis(); //starting from 2008/01/01
- long end = start + (interval*60000);
- log.debug("start time: "+start);
- log.debug("end time: "+end);
- Calendar now = Calendar.getInstance();
- String cluster = System.getProperty("CLUSTER");
- if(cluster==null) {
- cluster="unknown";
- }
- DatabaseWriter db = new DatabaseWriter(cluster);
- String fields = null;
- String dateclause = null;
- boolean emptyPrimeKey = false;
- log.info("Consolidate for "+interval+" minutes interval.");
-
- String[] tmpTable = dbc.findTableName(this.table, start, end);
- String table = tmpTable[0];
- String sumTable="";
- if(interval==5) {
- long partition=now.getTime().getTime() / DatabaseConfig.WEEK;
- StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append(this.table);
- stringBuilder.append("_");
- stringBuilder.append(partition);
- stringBuilder.append("_week");
- table=stringBuilder.toString();
- long partition2=now.getTime().getTime() / DatabaseConfig.MONTH;
- sumTable =this.table+"_"+partition2+"_month";
- } else if(interval==30) {
- long partition=now.getTime().getTime() / DatabaseConfig.MONTH;
- table=this.table+"_"+partition+"_month";
- long partition2=now.getTime().getTime() / DatabaseConfig.QUARTER;
- sumTable =this.table+"_"+partition2+"_month";
- } else if(interval==180) {
- long partition=now.getTime().getTime() / DatabaseConfig.QUARTER;
- table=this.table+"_"+partition+"_quarter";
- long partition2=now.getTime().getTime() / DatabaseConfig.YEAR;
- sumTable =this.table+"_"+partition2+"_month";
- } else if(interval==720) {
- long partition=now.getTime().getTime() / DatabaseConfig.YEAR;
- table=this.table+"_"+partition+"_year";
- long partition2=now.getTime().getTime() / DatabaseConfig.DECADE;
- sumTable =this.table+"_"+partition2+"_month";
- }
- // Find the most recent entry
- try {
- String query = "select * from "+sumTable+" order by timestamp desc limit 1";
- log.debug("Query: "+query);
- rs = db.query(query);
- if(rs==null) {
- throw new SQLException("Table is undefined.");
- }
- ResultSetMetaData rmeta = rs.getMetaData();
- boolean empty=true;
- if(rs.next()) {
- for(int i=1;i<=rmeta.getColumnCount();i++) {
- if(rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
- start = rs.getTimestamp(i).getTime();
- }
- }
- empty=false;
- }
- if(empty) {
- throw new SQLException("Table is empty.");
- }
- end = start + (interval*60000);
- } catch (SQLException ex) {
- try {
- String query = "select * from "+table+" order by timestamp limit 1";
- log.debug("Query: "+query);
- rs = db.query(query);
- if(rs.next()) {
- ResultSetMetaData rmeta = rs.getMetaData();
- for(int i=1;i<=rmeta.getColumnCount();i++) {
- if(rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
- start = rs.getTimestamp(i).getTime();
- }
- }
- }
- end = start + (interval*60000);
- } catch(SQLException ex2) {
- log.error("Unable to determine starting point in table: "+this.table);
- log.error("SQL Error:"+ExceptionUtil.getStackTrace(ex2));
- return;
- }
- }
- try {
- ResultSetMetaData rmeta = rs.getMetaData();
- int col = rmeta.getColumnCount();
- columns = new String[col];
- columnsType = new int[col];
- for(int i=1;i<=col;i++) {
- columns[i-1]=rmeta.getColumnName(i);
- columnsType[i-1]=rmeta.getColumnType(i);
- }
+ private static Log log = LogFactory.getLog(Consolidator.class);
+ private String table = null;
+ private int[] intervals;
+ private static PidFile loader = null;
+
+ public Consolidator(String table, String intervalString) {
+ super(table);
+ try {
+ int i = 0;
+ String[] temp = intervalString.split("\\,");
+ intervals = new int[temp.length];
+ for (String s : temp) {
+ intervals[i] = Integer.parseInt(s);
+ i++;
+ }
+ this.table = table;
+ } catch (NumberFormatException ex) {
+ log.error("Unable to parse summary interval");
+ }
+ }
- for(int i=0;i<columns.length;i++) {
- if(i==0) {
- fields=columns[i];
- if(columnsType[i]==java.sql.Types.VARCHAR) {
- if(groupBy.equals("")) {
- groupBy = " group by "+columns[i];
- } else {
- groupBy = groupBy+","+columns[i];
- }
- }
- } else {
- if(columnsType[i]==java.sql.Types.VARCHAR || columnsType[i]==java.sql.Types.TIMESTAMP) {
- fields=fields+","+columns[i];
- if(columnsType[i]==java.sql.Types.VARCHAR) {
- if(groupBy.equals("")) {
- groupBy = " group by "+columns[i];
- } else {
- groupBy = groupBy+","+columns[i];
- }
- }
- } else {
- fields=fields+",AVG("+columns[i]+") as "+columns[i];
- }
- }
- }
- } catch(SQLException ex) {
- log.error("SQL Error:"+ExceptionUtil.getStackTrace(ex));
- return;
- }
- if(groupBy.equals("")) {
- emptyPrimeKey = true;
+ public void run() {
+ ResultSet rs = null;
+ String[] columns;
+ int[] columnsType;
+ String groupBy = "";
+
+ for (int interval : intervals) {
+ // Start reducing from beginning of time;
+ Calendar aYearAgo = Calendar.getInstance();
+ aYearAgo.set(2008, 1, 1, 0, 0, 0);
+
+ long start = aYearAgo.getTimeInMillis(); // starting from 2008/01/01
+ long end = start + (interval * 60000);
+ log.debug("start time: " + start);
+ log.debug("end time: " + end);
+ Calendar now = Calendar.getInstance();
+ String cluster = System.getProperty("CLUSTER");
+ if (cluster == null) {
+ cluster = "unknown";
+ }
+ DatabaseWriter db = new DatabaseWriter(cluster);
+ String fields = null;
+ String dateclause = null;
+ boolean emptyPrimeKey = false;
+ log.info("Consolidate for " + interval + " minutes interval.");
+
+ String[] tmpTable = dbc.findTableName(this.table, start, end);
+ String table = tmpTable[0];
+ String sumTable = "";
+ if (interval == 5) {
+ long partition = now.getTime().getTime() / DatabaseConfig.WEEK;
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append(this.table);
+ stringBuilder.append("_");
+ stringBuilder.append(partition);
+ stringBuilder.append("_week");
+ table = stringBuilder.toString();
+ long partition2 = now.getTime().getTime() / DatabaseConfig.MONTH;
+ sumTable = this.table + "_" + partition2 + "_month";
+ } else if (interval == 30) {
+ long partition = now.getTime().getTime() / DatabaseConfig.MONTH;
+ table = this.table + "_" + partition + "_month";
+ long partition2 = now.getTime().getTime() / DatabaseConfig.QUARTER;
+ sumTable = this.table + "_" + partition2 + "_month";
+ } else if (interval == 180) {
+ long partition = now.getTime().getTime() / DatabaseConfig.QUARTER;
+ table = this.table + "_" + partition + "_quarter";
+ long partition2 = now.getTime().getTime() / DatabaseConfig.YEAR;
+ sumTable = this.table + "_" + partition2 + "_month";
+ } else if (interval == 720) {
+ long partition = now.getTime().getTime() / DatabaseConfig.YEAR;
+ table = this.table + "_" + partition + "_year";
+ long partition2 = now.getTime().getTime() / DatabaseConfig.DECADE;
+ sumTable = this.table + "_" + partition2 + "_month";
+ }
+ // Find the most recent entry
+ try {
+ String query = "select * from " + sumTable
+ + " order by timestamp desc limit 1";
+ log.debug("Query: " + query);
+ rs = db.query(query);
+ if (rs == null) {
+ throw new SQLException("Table is undefined.");
+ }
+ ResultSetMetaData rmeta = rs.getMetaData();
+ boolean empty = true;
+ if (rs.next()) {
+ for (int i = 1; i <= rmeta.getColumnCount(); i++) {
+ if (rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
+ start = rs.getTimestamp(i).getTime();
}
- long previousStart = start;
- long partition = 0;
- String timeWindowType="week";
- while(end < now.getTimeInMillis()-(interval*2*60000)) {
- // Select new data sample for the given intervals
- if(interval == 5) {
- timeWindowType="month";
- partition = start / DatabaseConfig.MONTH;
- } else if(interval == 30) {
- timeWindowType="quarter";
- partition = start / DatabaseConfig.QUARTER;
- } else if(interval == 180) {
- timeWindowType="year";
- partition = start / DatabaseConfig.YEAR;
- } else if(interval == 720) {
- timeWindowType="decade";
- partition = start / DatabaseConfig.DECADE;
- }
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String startS = formatter.format(start);
- String endS = formatter.format(end);
- dateclause = "Timestamp >= '"+startS+"' and Timestamp <= '"+endS+"'";
- if(emptyPrimeKey) {
- groupBy = " group by FLOOR(UNIX_TIMESTAMP(TimeStamp)/"+interval*60+")";
- }
- String query = "replace into "+this.table+"_"+partition+"_"+timeWindowType+" (select "+fields+" from "+table+" where "+dateclause+groupBy+")";
- log.debug(query);
- db.execute(query);
- if(previousStart == start) {
- start = start + (interval*60000);
- end = start + (interval*60000);
- previousStart = start;
- }
- }
- db.close();
- }
- }
-
- public static void main(String[] args) {
- DataConfig mdl = new DataConfig();
- loader=new PidFile(System.getProperty("CLUSTER")+"Consolidator");
- HashMap<String, String> tableNames = (HashMap<String, String>) mdl.startWith("consolidator.table.");
+ }
+ empty = false;
+ }
+ if (empty) {
+ throw new SQLException("Table is empty.");
+ }
+ end = start + (interval * 60000);
+ } catch (SQLException ex) {
try {
- Iterator<String> ti = (tableNames.keySet()).iterator();
- while(ti.hasNext()) {
- String table = ti.next();
- String interval=mdl.get(table);
- table = table.substring(19);
- log.info("Summarizing table:"+table);
- Consolidator dbc = new Consolidator(table, interval);
- dbc.run();
+ String query = "select * from " + table
+ + " order by timestamp limit 1";
+ log.debug("Query: " + query);
+ rs = db.query(query);
+ if (rs.next()) {
+ ResultSetMetaData rmeta = rs.getMetaData();
+ for (int i = 1; i <= rmeta.getColumnCount(); i++) {
+ if (rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
+ start = rs.getTimestamp(i).getTime();
+ }
+ }
+ }
+ end = start + (interval * 60000);
+ } catch (SQLException ex2) {
+ log.error("Unable to determine starting point in table: "
+ + this.table);
+ log.error("SQL Error:" + ExceptionUtil.getStackTrace(ex2));
+ return;
+ }
+ }
+ try {
+ ResultSetMetaData rmeta = rs.getMetaData();
+ int col = rmeta.getColumnCount();
+ columns = new String[col];
+ columnsType = new int[col];
+ for (int i = 1; i <= col; i++) {
+ columns[i - 1] = rmeta.getColumnName(i);
+ columnsType[i - 1] = rmeta.getColumnType(i);
+ }
+
+ for (int i = 0; i < columns.length; i++) {
+ if (i == 0) {
+ fields = columns[i];
+ if (columnsType[i] == java.sql.Types.VARCHAR) {
+ if (groupBy.equals("")) {
+ groupBy = " group by " + columns[i];
+ } else {
+ groupBy = groupBy + "," + columns[i];
+ }
+ }
+ } else {
+ if (columnsType[i] == java.sql.Types.VARCHAR
+ || columnsType[i] == java.sql.Types.TIMESTAMP) {
+ fields = fields + "," + columns[i];
+ if (columnsType[i] == java.sql.Types.VARCHAR) {
+ if (groupBy.equals("")) {
+ groupBy = " group by " + columns[i];
+ } else {
+ groupBy = groupBy + "," + columns[i];
}
- } catch (NullPointerException e) {
- log.error("Unable to summarize database.");
- log.error("Error:"+ExceptionUtil.getStackTrace(e));
+ }
+ } else {
+ fields = fields + ",AVG(" + columns[i] + ") as " + columns[i];
+ }
+ }
+ }
+ } catch (SQLException ex) {
+ log.error("SQL Error:" + ExceptionUtil.getStackTrace(ex));
+ return;
+ }
+ if (groupBy.equals("")) {
+ emptyPrimeKey = true;
+ }
+ long previousStart = start;
+ long partition = 0;
+ String timeWindowType = "week";
+ while (end < now.getTimeInMillis() - (interval * 2 * 60000)) {
+ // Select new data sample for the given intervals
+ if (interval == 5) {
+ timeWindowType = "month";
+ partition = start / DatabaseConfig.MONTH;
+ } else if (interval == 30) {
+ timeWindowType = "quarter";
+ partition = start / DatabaseConfig.QUARTER;
+ } else if (interval == 180) {
+ timeWindowType = "year";
+ partition = start / DatabaseConfig.YEAR;
+ } else if (interval == 720) {
+ timeWindowType = "decade";
+ partition = start / DatabaseConfig.DECADE;
}
- loader.clean();
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ String startS = formatter.format(start);
+ String endS = formatter.format(end);
+ dateclause = "Timestamp >= '" + startS + "' and Timestamp <= '" + endS
+ + "'";
+ if (emptyPrimeKey) {
+ groupBy = " group by FLOOR(UNIX_TIMESTAMP(TimeStamp)/" + interval
+ * 60 + ")";
+ }
+ String query = "replace into " + this.table + "_" + partition + "_"
+ + timeWindowType + " (select " + fields + " from " + table
+ + " where " + dateclause + groupBy + ")";
+ log.debug(query);
+ db.execute(query);
+ if (previousStart == start) {
+ start = start + (interval * 60000);
+ end = start + (interval * 60000);
+ previousStart = start;
+ }
+ }
+ db.close();
+ }
+ }
+
+ public static void main(String[] args) {
+ DataConfig mdl = new DataConfig();
+ loader = new PidFile(System.getProperty("CLUSTER") + "Consolidator");
+ HashMap<String, String> tableNames = (HashMap<String, String>) mdl
+ .startWith("consolidator.table.");
+ try {
+ Iterator<String> ti = (tableNames.keySet()).iterator();
+ while (ti.hasNext()) {
+ String table = ti.next();
+ String interval = mdl.get(table);
+ table = table.substring(19);
+ log.info("Summarizing table:" + table);
+ Consolidator dbc = new Consolidator(table, interval);
+ dbc.run();
+ }
+ } catch (NullPointerException e) {
+ log.error("Unable to summarize database.");
+ log.error("Error:" + ExceptionUtil.getStackTrace(e));
}
+ loader.clean();
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DataExpiration.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DataExpiration.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DataExpiration.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DataExpiration.java Wed Mar 11 22:39:26 2009
@@ -18,89 +18,97 @@
package org.apache.hadoop.chukwa.database;
+
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.util.DatabaseWriter;
public class DataExpiration {
- private static DatabaseConfig dbc = null;
- private static Log log = LogFactory.getLog(DataExpiration.class);
- public DataExpiration() {
- if(dbc==null) {
- dbc = new DatabaseConfig();
- }
+ private static DatabaseConfig dbc = null;
+ private static Log log = LogFactory.getLog(DataExpiration.class);
+
+ public DataExpiration() {
+ if (dbc == null) {
+ dbc = new DatabaseConfig();
+ }
+ }
+
+ public void dropTables(long start, long end) {
+ String cluster = System.getProperty("CLUSTER");
+ if (cluster == null) {
+ cluster = "unknown";
+ }
+ DatabaseWriter dbw = new DatabaseWriter(cluster);
+ try {
+ HashMap<String, String> dbNames = dbc.startWith("report.db.name.");
+ Iterator<String> ki = dbNames.keySet().iterator();
+ while (ki.hasNext()) {
+ String name = ki.next();
+ String tableName = dbNames.get(name);
+ String[] tableList = dbc.findTableName(tableName, start, end);
+ for (String tl : tableList) {
+ log.debug("table name: " + tableList[0]);
+ try {
+ String[] parts = tl.split("_");
+ int partition = Integer.parseInt(parts[parts.length - 2]);
+ String table = "";
+ for (int i = 0; i < parts.length - 2; i++) {
+ if (i != 0) {
+ table = table + "_";
+ }
+ table = table + parts[i];
+ }
+ partition = partition - 3;
+ String dropPartition = "drop table if exists " + table + "_"
+ + partition + "_" + parts[parts.length - 1];
+ dbw.execute(dropPartition);
+ partition--;
+ dropPartition = "drop table if exists " + table + "_" + partition
+ + "_" + parts[parts.length - 1];
+ dbw.execute(dropPartition);
+ } catch (NumberFormatException e) {
+ log
+ .error("Error in parsing table partition number, skipping table:"
+ + tableList[0]);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ log.debug("Skipping table:" + tableList[0]
+ + ", because it has no partition configuration.");
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
}
- public void dropTables(long start, long end) {
- String cluster = System.getProperty("CLUSTER");
- if(cluster==null) {
- cluster="unknown";
- }
- DatabaseWriter dbw = new DatabaseWriter(cluster);
- try {
- HashMap<String, String> dbNames = dbc.startWith("report.db.name.");
- Iterator<String> ki = dbNames.keySet().iterator();
- while(ki.hasNext()) {
- String name = ki.next();
- String tableName = dbNames.get(name);
- String[] tableList = dbc.findTableName(tableName, start, end);
- for(String tl : tableList) {
- log.debug("table name: "+tableList[0]);
- try {
- String[] parts = tl.split("_");
- int partition = Integer.parseInt(parts[parts.length-2]);
- String table = "";
- for(int i=0;i<parts.length-2;i++) {
- if(i!=0) {
- table=table+"_";
- }
- table=table+parts[i];
- }
- partition=partition-3;
- String dropPartition="drop table if exists "+table+"_"+partition+"_"+parts[parts.length-1];
- dbw.execute(dropPartition);
- partition--;
- dropPartition="drop table if exists "+table+"_"+partition+"_"+parts[parts.length-1];
- dbw.execute(dropPartition);
- } catch(NumberFormatException e) {
- log.error("Error in parsing table partition number, skipping table:"+tableList[0]);
- } catch(ArrayIndexOutOfBoundsException e) {
- log.debug("Skipping table:"+tableList[0]+", because it has no partition configuration.");
- }
- }
- }
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
-
- public static void usage() {
- System.out.println("DataExpiration usage:");
- System.out.println("java -jar chukwa-core.jar org.apache.hadoop.chukwa.DataExpiration <date> <time window size>");
- System.out.println(" date format: YYYY-MM-DD");
- System.out.println(" time window size: 7, 30, 91, 365");
- }
-
- public static void main(String[] args) {
- DataExpiration de = new DataExpiration();
- long now = (new Date()).getTime();
- long start = now;
- long end = now;
- if(args.length==2) {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
- try {
- start = sdf.parse(args[0]).getTime();
- end = start + (Long.parseLong(args[1])*1440*60*1000L);
- de.dropTables(start, end);
- } catch(Exception e) {
- usage();
- }
- } else {
- usage();
- }
+ }
+
+ public static void usage() {
+ System.out.println("DataExpiration usage:");
+ System.out
+ .println("java -jar chukwa-core.jar org.apache.hadoop.chukwa.DataExpiration <date> <time window size>");
+ System.out.println(" date format: YYYY-MM-DD");
+ System.out.println(" time window size: 7, 30, 91, 365");
+ }
+
+ public static void main(String[] args) {
+ DataExpiration de = new DataExpiration();
+ long now = (new Date()).getTime();
+ long start = now;
+ long end = now;
+ if (args.length == 2) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ try {
+ start = sdf.parse(args[0]).getTime();
+ end = start + (Long.parseLong(args[1]) * 1440 * 60 * 1000L);
+ de.dropTables(start, end);
+ } catch (Exception e) {
+ usage();
+ }
+ } else {
+ usage();
}
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java Wed Mar 11 22:39:26 2009
@@ -18,227 +18,237 @@
package org.apache.hadoop.chukwa.database;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.util.*;
public class DatabaseConfig {
- private Configuration config = null;
- public final static long CENTURY=36500*24*60*60*1000L;
- public final static long DECADE=3650*24*60*60*1000L;
- public final static long YEAR=365*24*60*60*1000L;
- public final static long QUARTER=91250*24*60*60L;
- public final static long MONTH=30*24*60*60*1000L;
- public final static long WEEK=7*24*60*60*1000L;
- public final static long DAY=24*60*60*1000L;
-
- public DatabaseConfig(String path) {
- Path fileResource = new Path(path);
- config = new Configuration();
- config.addResource(fileResource);
- }
- public DatabaseConfig() {
- Path fileResource = new Path(System.getenv("DATACONFIG"));
- config = new Configuration();
- config.addResource(fileResource);
- }
-
- public String get(String key) {
- return config.get(key);
- }
- public void put(String key, String value) {
- this.config.set(key, value);
- }
- public Iterator<?> iterator() {
- return this.config.iterator();
- }
- public HashMap<String, String> startWith(String key) {
- HashMap<String, String> transformer = new HashMap<String, String>();
- Iterator<?> entries = config.iterator();
- while(entries.hasNext()) {
- String entry = entries.next().toString();
- if(entry.startsWith(key)) {
- String[] metrics = entry.split("=");
- transformer.put(metrics[0],metrics[1]);
- }
+ private Configuration config = null;
+ public final static long CENTURY = 36500 * 24 * 60 * 60 * 1000L;
+ public final static long DECADE = 3650 * 24 * 60 * 60 * 1000L;
+ public final static long YEAR = 365 * 24 * 60 * 60 * 1000L;
+ public final static long QUARTER = 91250 * 24 * 60 * 60L;
+ public final static long MONTH = 30 * 24 * 60 * 60 * 1000L;
+ public final static long WEEK = 7 * 24 * 60 * 60 * 1000L;
+ public final static long DAY = 24 * 60 * 60 * 1000L;
+
+ public DatabaseConfig(String path) {
+ Path fileResource = new Path(path);
+ config = new Configuration();
+ config.addResource(fileResource);
+ }
+
+ public DatabaseConfig() {
+ Path fileResource = new Path(System.getenv("DATACONFIG"));
+ config = new Configuration();
+ config.addResource(fileResource);
+ }
+
+ public String get(String key) {
+ return config.get(key);
+ }
+
+ public void put(String key, String value) {
+ this.config.set(key, value);
+ }
+
+ public Iterator<?> iterator() {
+ return this.config.iterator();
+ }
+
+ public HashMap<String, String> startWith(String key) {
+ HashMap<String, String> transformer = new HashMap<String, String>();
+ Iterator<?> entries = config.iterator();
+ while (entries.hasNext()) {
+ String entry = entries.next().toString();
+ if (entry.startsWith(key)) {
+ String[] metrics = entry.split("=");
+ transformer.put(metrics[0], metrics[1]);
+ }
+ }
+ return transformer;
+ }
+
+ public String[] findTableName(String tableName, long start, long end) {
+ String[] tableNames = null;
+ String tableType = "_week";
+ long now = (new Date()).getTime();
+ long timeWindow = end - start;
+ long partitionSize = WEEK;
+ boolean fallback = true;
+
+ if (config.get("consolidator.table." + tableName) == null) {
+ tableNames = new String[1];
+ tableNames[0] = tableName;
+ return tableNames;
+ }
+
+ if (timeWindow <= 0) {
+ timeWindow = 1;
+ }
+ if (timeWindow > DECADE) {
+ tableType = "_century";
+ partitionSize = CENTURY;
+ } else if (timeWindow > YEAR) {
+ tableType = "_decade";
+ partitionSize = DECADE;
+ } else if (timeWindow > QUARTER) {
+ tableType = "_year";
+ partitionSize = YEAR;
+ } else if (timeWindow > MONTH) {
+ tableType = "_quarter";
+ partitionSize = QUARTER;
+ } else if (timeWindow > WEEK) {
+ tableType = "_month";
+ partitionSize = MONTH;
+ } else {
+ tableType = "_week";
+ partitionSize = WEEK;
+ }
+
+ long currentPartition = now / partitionSize;
+ long startPartition = start / partitionSize;
+ long endPartition = end / partitionSize;
+ while (fallback && partitionSize != CENTURY * 100) {
+ // Check if the starting date is in the far distance from current time. If
+ // it is, use down sampled data.
+ if (startPartition + 2 < currentPartition) {
+ fallback = true;
+ if (partitionSize == DAY) {
+ tableType = "_week";
+ partitionSize = WEEK;
+ } else if (partitionSize == WEEK) {
+ tableType = "_month";
+ partitionSize = MONTH;
+ } else if (partitionSize == MONTH) {
+ tableType = "_year";
+ partitionSize = YEAR;
+ } else if (partitionSize == YEAR) {
+ tableType = "_decade";
+ partitionSize = DECADE;
+ } else if (partitionSize == DECADE) {
+ tableType = "_century";
+ partitionSize = CENTURY;
+ } else {
+ partitionSize = 100 * CENTURY;
+ }
+ currentPartition = now / partitionSize;
+ startPartition = start / partitionSize;
+ endPartition = end / partitionSize;
+ } else {
+ fallback = false;
+ }
+ }
+
+ if (startPartition != endPartition) {
+ int delta = (int) (endPartition - startPartition);
+ tableNames = new String[delta + 1];
+ for (int i = 0; i <= delta; i++) {
+ long partition = startPartition + (long) i;
+ tableNames[i] = tableName + "_" + partition + tableType;
+ }
+ } else {
+ tableNames = new String[1];
+ tableNames[0] = tableName + "_" + startPartition + tableType;
+ }
+ return tableNames;
+ }
+
+ public String[] findTableNameForCharts(String tableName, long start, long end) {
+ String[] tableNames = null;
+ String tableType = "_week";
+ long now = (new Date()).getTime();
+ long timeWindow = end - start;
+ if (timeWindow > 60 * 60 * 1000) {
+ timeWindow = timeWindow + 1;
+ }
+ long partitionSize = WEEK;
+ boolean fallback = true;
+
+ if (config.get("consolidator.table." + tableName) == null) {
+ tableNames = new String[1];
+ tableNames[0] = tableName;
+ return tableNames;
+ }
+
+ if (timeWindow <= 0) {
+ timeWindow = 1;
+ }
+ if (timeWindow > DECADE) {
+ tableType = "_decade";
+ partitionSize = CENTURY;
+ } else if (timeWindow > YEAR) {
+ tableType = "_decade";
+ partitionSize = CENTURY;
+ } else if (timeWindow > QUARTER) {
+ tableType = "_decade";
+ partitionSize = DECADE;
+ } else if (timeWindow > MONTH) {
+ tableType = "_year";
+ partitionSize = YEAR;
+ } else if (timeWindow > WEEK) {
+ tableType = "_quarter";
+ partitionSize = QUARTER;
+ } else if (timeWindow > DAY) {
+ tableType = "_month";
+ partitionSize = MONTH;
+ } else {
+ tableType = "_week";
+ partitionSize = WEEK;
+ }
+
+ long currentPartition = now / partitionSize;
+ long startPartition = start / partitionSize;
+ long endPartition = end / partitionSize;
+ while (fallback && partitionSize != DECADE * 100) {
+ // Check if the starting date is in the far distance from current time. If
+ // it is, use down sampled data.
+ if (startPartition + 2 < currentPartition) {
+ fallback = true;
+ if (partitionSize == DAY) {
+ tableType = "_month";
+ partitionSize = MONTH;
+ } else if (partitionSize == WEEK) {
+ tableType = "_quarter";
+ partitionSize = QUARTER;
+ } else if (partitionSize == MONTH) {
+ tableType = "_year";
+ partitionSize = YEAR;
+ } else if (partitionSize == YEAR) {
+ tableType = "_decade";
+ partitionSize = DECADE;
+ } else {
+ partitionSize = CENTURY;
}
- return transformer;
- }
- public String[] findTableName(String tableName, long start, long end) {
- String[] tableNames = null;
- String tableType = "_week";
- long now = (new Date()).getTime();
- long timeWindow = end - start;
- long partitionSize=WEEK;
- boolean fallback=true;
-
- if(config.get("consolidator.table."+tableName)==null) {
- tableNames = new String[1];
- tableNames[0]=tableName;
- return tableNames;
- }
-
- if(timeWindow<=0) {
- timeWindow=1;
- }
- if(timeWindow > DECADE) {
- tableType = "_century";
- partitionSize=CENTURY;
- } else if(timeWindow > YEAR) {
- tableType = "_decade";
- partitionSize=DECADE;
- } else if(timeWindow > QUARTER) {
- tableType = "_year";
- partitionSize=YEAR;
- } else if(timeWindow > MONTH) {
- tableType = "_quarter";
- partitionSize=QUARTER;
- } else if(timeWindow > WEEK) {
- tableType = "_month";
- partitionSize=MONTH;
- } else {
- tableType = "_week";
- partitionSize=WEEK;
- }
-
- long currentPartition = now / partitionSize;
- long startPartition = start / partitionSize;
- long endPartition = end / partitionSize;
- while(fallback && partitionSize!=CENTURY*100) {
- // Check if the starting date is in the far distance from current time. If it is, use down sampled data.
- if(startPartition + 2 < currentPartition) {
- fallback=true;
- if(partitionSize==DAY) {
- tableType = "_week";
- partitionSize=WEEK;
- } else if(partitionSize==WEEK) {
- tableType = "_month";
- partitionSize=MONTH;
- } else if(partitionSize==MONTH) {
- tableType = "_year";
- partitionSize=YEAR;
- } else if(partitionSize==YEAR) {
- tableType = "_decade";
- partitionSize=DECADE;
- } else if(partitionSize==DECADE) {
- tableType = "_century";
- partitionSize=CENTURY;
- } else {
- partitionSize=100*CENTURY;
- }
- currentPartition = now / partitionSize;
- startPartition = start / partitionSize;
- endPartition = end / partitionSize;
- } else {
- fallback=false;
- }
- }
-
- if(startPartition!=endPartition) {
- int delta = (int) (endPartition-startPartition);
- tableNames=new String[delta+1];
- for(int i=0;i<=delta;i++) {
- long partition = startPartition+(long)i;
- tableNames[i]=tableName+"_"+partition+tableType;
- }
- } else {
- tableNames=new String[1];
- tableNames[0]=tableName+"_"+startPartition+tableType;
- }
- return tableNames;
- }
- public String[] findTableNameForCharts(String tableName, long start, long end) {
- String[] tableNames = null;
- String tableType = "_week";
- long now = (new Date()).getTime();
- long timeWindow = end - start;
- if(timeWindow>60*60*1000) {
- timeWindow = timeWindow + 1;
- }
- long partitionSize=WEEK;
- boolean fallback=true;
-
- if(config.get("consolidator.table."+tableName)==null) {
- tableNames = new String[1];
- tableNames[0]=tableName;
- return tableNames;
- }
-
- if(timeWindow<=0) {
- timeWindow=1;
- }
- if(timeWindow > DECADE) {
- tableType = "_decade";
- partitionSize=CENTURY;
- } else if(timeWindow > YEAR) {
- tableType = "_decade";
- partitionSize=CENTURY;
- } else if(timeWindow > QUARTER) {
- tableType = "_decade";
- partitionSize=DECADE;
- } else if(timeWindow > MONTH) {
- tableType = "_year";
- partitionSize=YEAR;
- } else if(timeWindow > WEEK) {
- tableType = "_quarter";
- partitionSize=QUARTER;
- } else if(timeWindow > DAY) {
- tableType = "_month";
- partitionSize=MONTH;
- } else {
- tableType = "_week";
- partitionSize = WEEK;
- }
-
- long currentPartition = now / partitionSize;
- long startPartition = start / partitionSize;
- long endPartition = end / partitionSize;
- while(fallback && partitionSize!=DECADE*100) {
- // Check if the starting date is in the far distance from current time. If it is, use down sampled data.
- if(startPartition + 2 < currentPartition) {
- fallback=true;
- if(partitionSize==DAY) {
- tableType = "_month";
- partitionSize=MONTH;
- } else if(partitionSize==WEEK) {
- tableType = "_quarter";
- partitionSize=QUARTER;
- } else if(partitionSize==MONTH) {
- tableType = "_year";
- partitionSize=YEAR;
- } else if(partitionSize==YEAR) {
- tableType = "_decade";
- partitionSize=DECADE;
- } else {
- partitionSize=CENTURY;
- }
- currentPartition = now / partitionSize;
- startPartition = start / partitionSize;
- endPartition = end / partitionSize;
- } else {
- fallback=false;
- }
- }
-
- if(startPartition!=endPartition) {
- int delta = (int) (endPartition-startPartition);
- tableNames=new String[delta+1];
- for(int i=0;i<=delta;i++) {
- long partition = startPartition+(long)i;
- tableNames[i]=tableName+"_"+partition+tableType;
- }
- } else {
- tableNames=new String[1];
- tableNames[0]=tableName+"_"+startPartition+tableType;
- }
- return tableNames;
- }
-
- public static void main(String[] args) {
- DatabaseConfig dbc = new DatabaseConfig();
- String[] names = dbc.findTableName("system_metrics",1216140020000L,1218645620000L);
- for(String n: names) {
- System.out.println("name:"+n);
- }
+ currentPartition = now / partitionSize;
+ startPartition = start / partitionSize;
+ endPartition = end / partitionSize;
+ } else {
+ fallback = false;
+ }
+ }
+
+ if (startPartition != endPartition) {
+ int delta = (int) (endPartition - startPartition);
+ tableNames = new String[delta + 1];
+ for (int i = 0; i <= delta; i++) {
+ long partition = startPartition + (long) i;
+ tableNames[i] = tableName + "_" + partition + tableType;
+ }
+ } else {
+ tableNames = new String[1];
+ tableNames[0] = tableName + "_" + startPartition + tableType;
+ }
+ return tableNames;
+ }
+
+ public static void main(String[] args) {
+ DatabaseConfig dbc = new DatabaseConfig();
+ String[] names = dbc.findTableName("system_metrics", 1216140020000L,
+ 1218645620000L);
+ for (String n : names) {
+ System.out.println("name:" + n);
}
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java Wed Mar 11 22:39:26 2009
@@ -1,5 +1,6 @@
package org.apache.hadoop.chukwa.database;
+
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
@@ -10,150 +11,136 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-public class MetricsAggregation
-{
- private static Log log = LogFactory.getLog(MetricsAggregation.class);
- private static Connection conn = null;
- private static Statement stmt = null;
- private static ResultSet rs = null;
- private static DatabaseConfig mdlConfig;
-
- /**
- * @param args
- * @throws SQLException
- */
- public static void main(String[] args) throws SQLException
- {
- mdlConfig = new DatabaseConfig();
-
- // Connect to the database
- String jdbc_url = System.getenv("JDBC_URL_PREFIX")+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
- if(mdlConfig.get("jdbc.user")!=null) {
- jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
- if(mdlConfig.get("jdbc.password")!=null) {
- jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
- }
- }
- try {
- // The newInstance() call is a work around for some
- // broken Java implementations
- String jdbcDriver = System.getenv("JDBC_DRIVER");
- Class.forName(jdbcDriver).newInstance();
- log.info("Initialized JDBC URL: "+jdbc_url);
- } catch (Exception ex) {
- // handle the error
- ex.printStackTrace();
- log.error(ex,ex);
- }
- try {
- conn = DriverManager.getConnection(jdbc_url);
- } catch (SQLException ex)
- {
- ex.printStackTrace();
- log.error(ex,ex);
- }
-
- // get the latest timestamp for aggregation on this table
- // Start = latest
-
-
-
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
- long start = System.currentTimeMillis() - (1000*60*60*24);
- long end = System.currentTimeMillis() - (1000*60*10);
- // retrieve metadata for cluster_system_metrics
- DatabaseConfig dbConf = new DatabaseConfig();
- String[] tables = dbConf.findTableName("cluster_system_metrics_2018_week", start, end);
- for(String table: tables)
- {
- System.out.println("Table to aggregate per Ts: " + table);
- stmt = conn.createStatement();
- rs = stmt.executeQuery("select table_ts from aggregation_admin_table where table_name=\""
- + table + "\"");
- if (rs.next())
- {
- start = rs.getLong(1);
- }
- else
- {
- start = 0;
- }
-
- end = start + (1000*60*60*1); // do 1 hour aggregation max
- long now = System.currentTimeMillis();
- now = now - (1000*60*10); // wait for 10 minutes
- end = Math.min(now, end);
-
- // TODO REMOVE DEBUG ONLY!
- end = now;
-
- System.out.println("Start Date:" + new Date(start));
- System.out.println("End Date:" + new Date(end));
-
- DatabaseMetaData dbm = conn.getMetaData ();
- rs = dbm.getColumns ( null,null,table, null);
-
- List<String> cols = new ArrayList<String>();
- while (rs.next ())
- {
- String s = rs.getString (4); // 4 is column name, 5 data type etc.
- System.out.println ("Name: " + s);
- int type = rs.getInt(5);
- if (type == java.sql.Types.VARCHAR)
- {
- System.out.println("Type: Varchar " + type);
- }
- else
- {
- cols.add(s);
- System.out.println("Type: Number " + type);
- }
- }// end of while.
-
- // build insert into from select query
- String initTable = table.replace("cluster_", "");
- StringBuilder sb0 = new StringBuilder();
- StringBuilder sb = new StringBuilder();
- sb0.append("insert into ").append(table).append(" (");
- sb.append(" ( select ");
- for (int i=0;i<cols.size();i++)
- {
- sb0.append(cols.get(i));
- sb.append("avg(").append(cols.get(i)).append(") ");
- if (i< cols.size()-1)
- {
- sb0.append(",");
- sb.append(",");
- }
- }
- sb.append(" from ").append(initTable);
- sb.append(" where timestamp between \"");
- sb.append(formatter.format(start));
- sb.append("\" and \"").append(formatter.format(end));
- sb.append("\" group by timestamp )");
-
-
- // close fields
- sb0.append(" )").append(sb);
- System.out.println(sb0.toString());
-
- // run query
- conn.setAutoCommit(false);
- stmt = conn.createStatement();
- stmt.execute(sb0.toString());
-
- // update last run
- stmt = conn.createStatement();
- stmt.execute("insert into aggregation_admin_table set table_ts=\"" + formatter.format(end) +
- "\" where table_name=\"" + table + "\"");
- conn.commit();
- }
-
- }
+public class MetricsAggregation {
+ private static Log log = LogFactory.getLog(MetricsAggregation.class);
+ private static Connection conn = null;
+ private static Statement stmt = null;
+ private static ResultSet rs = null;
+ private static DatabaseConfig mdlConfig;
+
+ /**
+ * @param args
+ * @throws SQLException
+ */
+ public static void main(String[] args) throws SQLException {
+ mdlConfig = new DatabaseConfig();
+
+ // Connect to the database
+ String jdbc_url = System.getenv("JDBC_URL_PREFIX")
+ + mdlConfig.get("jdbc.host") + "/" + mdlConfig.get("jdbc.db");
+ if (mdlConfig.get("jdbc.user") != null) {
+ jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
+ if (mdlConfig.get("jdbc.password") != null) {
+ jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
+ }
+ }
+ try {
+ // The newInstance() call is a work around for some
+ // broken Java implementations
+ String jdbcDriver = System.getenv("JDBC_DRIVER");
+ Class.forName(jdbcDriver).newInstance();
+ log.info("Initialized JDBC URL: " + jdbc_url);
+ } catch (Exception ex) {
+ // handle the error
+ ex.printStackTrace();
+ log.error(ex, ex);
+ }
+ try {
+ conn = DriverManager.getConnection(jdbc_url);
+ } catch (SQLException ex) {
+ ex.printStackTrace();
+ log.error(ex, ex);
+ }
+
+ // get the latest timestamp for aggregation on this table
+ // Start = latest
+
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ long start = System.currentTimeMillis() - (1000 * 60 * 60 * 24);
+ long end = System.currentTimeMillis() - (1000 * 60 * 10);
+ // retrieve metadata for cluster_system_metrics
+ DatabaseConfig dbConf = new DatabaseConfig();
+ String[] tables = dbConf.findTableName("cluster_system_metrics_2018_week",
+ start, end);
+ for (String table : tables) {
+ System.out.println("Table to aggregate per Ts: " + table);
+ stmt = conn.createStatement();
+ rs = stmt
+ .executeQuery("select table_ts from aggregation_admin_table where table_name=\""
+ + table + "\"");
+ if (rs.next()) {
+ start = rs.getLong(1);
+ } else {
+ start = 0;
+ }
+
+ end = start + (1000 * 60 * 60 * 1); // do 1 hour aggregation max
+ long now = System.currentTimeMillis();
+ now = now - (1000 * 60 * 10); // wait for 10 minutes
+ end = Math.min(now, end);
+
+ // TODO REMOVE DEBUG ONLY!
+ end = now;
+
+ System.out.println("Start Date:" + new Date(start));
+ System.out.println("End Date:" + new Date(end));
+
+ DatabaseMetaData dbm = conn.getMetaData();
+ rs = dbm.getColumns(null, null, table, null);
+
+ List<String> cols = new ArrayList<String>();
+ while (rs.next()) {
+ String s = rs.getString(4); // 4 is column name, 5 data type etc.
+ System.out.println("Name: " + s);
+ int type = rs.getInt(5);
+ if (type == java.sql.Types.VARCHAR) {
+ System.out.println("Type: Varchar " + type);
+ } else {
+ cols.add(s);
+ System.out.println("Type: Number " + type);
+ }
+ }// end of while.
+
+ // build insert into from select query
+ String initTable = table.replace("cluster_", "");
+ StringBuilder sb0 = new StringBuilder();
+ StringBuilder sb = new StringBuilder();
+ sb0.append("insert into ").append(table).append(" (");
+ sb.append(" ( select ");
+ for (int i = 0; i < cols.size(); i++) {
+ sb0.append(cols.get(i));
+ sb.append("avg(").append(cols.get(i)).append(") ");
+ if (i < cols.size() - 1) {
+ sb0.append(",");
+ sb.append(",");
+ }
+ }
+ sb.append(" from ").append(initTable);
+ sb.append(" where timestamp between \"");
+ sb.append(formatter.format(start));
+ sb.append("\" and \"").append(formatter.format(end));
+ sb.append("\" group by timestamp )");
+
+ // close fields
+ sb0.append(" )").append(sb);
+ System.out.println(sb0.toString());
+
+ // run query
+ conn.setAutoCommit(false);
+ stmt = conn.createStatement();
+ stmt.execute(sb0.toString());
+
+ // update last run
+ stmt = conn.createStatement();
+ stmt.execute("insert into aggregation_admin_table set table_ts=\""
+ + formatter.format(end) + "\" where table_name=\"" + table + "\"");
+ conn.commit();
+ }
+
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/TableCreator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/TableCreator.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/TableCreator.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/TableCreator.java Wed Mar 11 22:39:26 2009
@@ -15,112 +15,125 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
package org.apache.hadoop.chukwa.database;
+
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.util.DatabaseWriter;
public class TableCreator {
- private static DatabaseConfig dbc = null;
- private static Log log = LogFactory.getLog(TableCreator.class);
- public TableCreator() {
- if(dbc==null) {
- dbc = new DatabaseConfig();
- }
+ private static DatabaseConfig dbc = null;
+ private static Log log = LogFactory.getLog(TableCreator.class);
+
+ public TableCreator() {
+ if (dbc == null) {
+ dbc = new DatabaseConfig();
}
- public void createTables() {
- long now = (new Date()).getTime();
- createTables(now,now);
- }
- public void createTables(long start, long end) {
- String cluster = System.getProperty("CLUSTER");
- if(cluster==null) {
- cluster="unknown";
- }
- DatabaseWriter dbw = new DatabaseWriter(cluster);
- try {
- HashMap<String, String> dbNames = dbc.startWith("report.db.name.");
- Iterator<String> ki = dbNames.keySet().iterator();
- while(ki.hasNext()) {
- String name = ki.next();
- String tableName = dbNames.get(name);
- String[] tableList = dbc.findTableName(tableName, start, end);
- log.debug("table name: "+tableList[0]);
- try {
- String[] parts = tableList[0].split("_");
- int partition = Integer.parseInt(parts[parts.length-2]);
- String table = "";
- for(int i=0;i<parts.length-2;i++) {
- if(i!=0) {
- table=table+"_";
- }
- table=table+parts[i];
- }
- String query = "show create table "+table+"_template;";
- ResultSet rs = dbw.query(query);
- while(rs.next()) {
- log.debug("table schema: "+rs.getString(2));
- query=rs.getString(2);
- log.debug("template table name:"+table+"_template");
- log.debug("replacing with table name:"+table+"_"+partition+"_"+parts[parts.length-1]);
- log.debug("creating table: "+query);
- String createPartition=query.replaceFirst(table+"_template", table+"_"+partition+"_"+parts[parts.length-1]);
- createPartition=createPartition.replaceFirst("TABLE","TABLE IF NOT EXISTS");
- dbw.execute(createPartition);
- partition++;
- createPartition=query.replaceFirst(table+"_template", table+"_"+partition+"_"+parts[parts.length-1]);
- createPartition=createPartition.replaceFirst("TABLE","TABLE IF NOT EXISTS");
- dbw.execute(createPartition);
- partition++;
- createPartition=query.replaceFirst(table+"_template", table+"_"+partition+"_"+parts[parts.length-1]);
- createPartition=createPartition.replaceFirst("TABLE","TABLE IF NOT EXISTS");
- dbw.execute(createPartition);
- }
- } catch(NumberFormatException e) {
- log.error("Error in parsing table partition number, skipping table:"+tableList[0]);
- } catch(ArrayIndexOutOfBoundsException e) {
- log.debug("Skipping table:"+tableList[0]+", because it has no partition configuration.");
- } catch(SQLException e) {
-
- }
- }
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
-
- public static void usage() {
- System.out.println("TableCreator usage:");
- System.out.println("java -jar chukwa-core.jar org.apache.hadoop.chukwa.TableCreator <date> <time window size>");
- System.out.println(" date format: YYYY-MM-DD");
- System.out.println(" time window size: 7, 30, 91, 365, 3650");
- }
-
- public static void main(String[] args) {
- TableCreator tc = new TableCreator();
- if(args.length==2) {
- try {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
- long start = sdf.parse(args[0]).getTime();
- long end = start + (Long.parseLong(args[1])*1440*60*1000L);
- tc.createTables(start, end);
- } catch(Exception e) {
- System.out.println("Invalid date format or time window size.");
- e.printStackTrace();
- usage();
- }
- } else {
- tc.createTables();
- }
+ }
+
+ public void createTables() {
+ long now = (new Date()).getTime();
+ createTables(now, now);
+ }
+ public void createTables(long start, long end) {
+ String cluster = System.getProperty("CLUSTER");
+ if (cluster == null) {
+ cluster = "unknown";
}
+ DatabaseWriter dbw = new DatabaseWriter(cluster);
+ try {
+ HashMap<String, String> dbNames = dbc.startWith("report.db.name.");
+ Iterator<String> ki = dbNames.keySet().iterator();
+ while (ki.hasNext()) {
+ String name = ki.next();
+ String tableName = dbNames.get(name);
+ String[] tableList = dbc.findTableName(tableName, start, end);
+ log.debug("table name: " + tableList[0]);
+ try {
+ String[] parts = tableList[0].split("_");
+ int partition = Integer.parseInt(parts[parts.length - 2]);
+ String table = "";
+ for (int i = 0; i < parts.length - 2; i++) {
+ if (i != 0) {
+ table = table + "_";
+ }
+ table = table + parts[i];
+ }
+ String query = "show create table " + table + "_template;";
+ ResultSet rs = dbw.query(query);
+ while (rs.next()) {
+ log.debug("table schema: " + rs.getString(2));
+ query = rs.getString(2);
+ log.debug("template table name:" + table + "_template");
+ log.debug("replacing with table name:" + table + "_" + partition
+ + "_" + parts[parts.length - 1]);
+ log.debug("creating table: " + query);
+ String createPartition = query.replaceFirst(table + "_template",
+ table + "_" + partition + "_" + parts[parts.length - 1]);
+ createPartition = createPartition.replaceFirst("TABLE",
+ "TABLE IF NOT EXISTS");
+ dbw.execute(createPartition);
+ partition++;
+ createPartition = query.replaceFirst(table + "_template", table
+ + "_" + partition + "_" + parts[parts.length - 1]);
+ createPartition = createPartition.replaceFirst("TABLE",
+ "TABLE IF NOT EXISTS");
+ dbw.execute(createPartition);
+ partition++;
+ createPartition = query.replaceFirst(table + "_template", table
+ + "_" + partition + "_" + parts[parts.length - 1]);
+ createPartition = createPartition.replaceFirst("TABLE",
+ "TABLE IF NOT EXISTS");
+ dbw.execute(createPartition);
+ }
+ } catch (NumberFormatException e) {
+ log.error("Error in parsing table partition number, skipping table:"
+ + tableList[0]);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ log.debug("Skipping table:" + tableList[0]
+ + ", because it has no partition configuration.");
+ } catch (SQLException e) {
+
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static void usage() {
+ System.out.println("TableCreator usage:");
+ System.out
+ .println("java -jar chukwa-core.jar org.apache.hadoop.chukwa.TableCreator <date> <time window size>");
+ System.out.println(" date format: YYYY-MM-DD");
+ System.out.println(" time window size: 7, 30, 91, 365, 3650");
+ }
+
+ public static void main(String[] args) {
+ TableCreator tc = new TableCreator();
+ if (args.length == 2) {
+ try {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+ long start = sdf.parse(args[0]).getTime();
+ long end = start + (Long.parseLong(args[1]) * 1440 * 60 * 1000L);
+ tc.createTables(start, end);
+ } catch (Exception e) {
+ System.out.println("Invalid date format or time window size.");
+ e.printStackTrace();
+ usage();
+ }
+ } else {
+ tc.createTables();
+ }
+
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
package org.apache.hadoop.chukwa.datacollection;
-import java.util.List;
+import java.util.List;
import org.apache.hadoop.chukwa.Chunk;
/**
@@ -27,24 +27,25 @@
*
* Differs from a normal queue interface primarily by having collect().
*/
-public interface ChunkQueue extends ChunkReceiver
-{
+public interface ChunkQueue extends ChunkReceiver {
/**
- * Add a chunk to the queue, blocking if queue is full.
+ * Add a chunk to the queue, blocking if queue is full.
+ *
* @param chunk
* @throws InterruptedException if thread is interrupted while blocking
*/
- public void add(Chunk chunk) throws InterruptedException;
-
- /**
- * Return at least one, and no more than count, Chunks into chunks.
- * Blocks if queue is empty.
- */
- public void collect(List<Chunk> chunks,int count) throws InterruptedException;
-
- /**
- * Return an approximation of the number of chunks in the queue currently.
- * No guarantees are made about the accuracy of this number.
- */
- public int size();
+ public void add(Chunk chunk) throws InterruptedException;
+
+ /**
+ * Return at least one, and no more than count, Chunks into chunks. Blocks if
+ * queue is empty.
+ */
+ public void collect(List<Chunk> chunks, int count)
+ throws InterruptedException;
+
+ /**
+ * Return an approximation of the number of chunks in the queue currently. No
+ * guarantees are made about the accuracy of this number.
+ */
+ public int size();
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java Wed Mar 11 22:39:26 2009
@@ -1,15 +1,16 @@
package org.apache.hadoop.chukwa.datacollection;
+
import org.apache.hadoop.chukwa.Chunk;
public interface ChunkReceiver {
-
+
/**
- * Add a chunk to the queue, potentially blocking.
+ * Add a chunk to the queue, potentially blocking.
+ *
* @param event
* @throws InterruptedException if thread is interrupted while blocking
*/
public void add(Chunk event) throws java.lang.InterruptedException;
-
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java Wed Mar 11 22:39:26 2009
@@ -18,70 +18,67 @@
package org.apache.hadoop.chukwa.datacollection;
+
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
-
import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
-
import org.apache.hadoop.chukwa.datacollection.agent.*;
import org.apache.log4j.Logger;
-public class DataFactory
-{
+public class DataFactory {
static Logger log = Logger.getLogger(DataFactory.class);
- static final int QUEUE_SIZE_KB = 10 * 1024;
- static final String COLLECTORS_FILENAME = "collectors";
- private static DataFactory dataFactory = null;
- private ChunkQueue chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
-
- static
- {
- dataFactory = new DataFactory();
- }
-
- private DataFactory()
- {}
-
- public static DataFactory getInstance() {
- return dataFactory;
- }
-
- public ChunkQueue getEventQueue() {
- return chunkQueue;
- }
-
- /**
- * @return empty list if file does not exist
- * @throws IOException on other error
- */
- public Iterator<String> getCollectorURLs() throws IOException
- {
- String chukwaHome = System.getenv("CHUKWA_HOME");
- if (chukwaHome == null){
- chukwaHome = ".";
- }
-
- if(!chukwaHome.endsWith("/"))
- { chukwaHome = chukwaHome + File.separator; }
- log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]" );
-
- String chukwaConf = System.getenv("CHUKWA_CONF_DIR");
- if (chukwaConf == null)
- {
- chukwaConf = chukwaHome + "conf" + File.separator;
- }
-
- log.info("Config - System.getenv(\"chukwaConf\"): [" + chukwaConf + "]" );
-
- log.info("setting up collectors file: " + chukwaConf +
- File.separator + COLLECTORS_FILENAME);
- File collectors = new File(chukwaConf + File.separator + "collectors");
- try{
- return new RetryListOfCollectors(collectors, 1000 * 15);//time is ms between tries
- } catch(java.io.IOException e) {
- log.error("failed to read collectors file: ", e);
- throw e;
- }
- }
+ static final int QUEUE_SIZE_KB = 10 * 1024;
+ static final String COLLECTORS_FILENAME = "collectors";
+ private static DataFactory dataFactory = null;
+ private ChunkQueue chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
+
+ static {
+ dataFactory = new DataFactory();
+ }
+
+ private DataFactory() {
+ }
+
+ public static DataFactory getInstance() {
+ return dataFactory;
+ }
+
+ public ChunkQueue getEventQueue() {
+ return chunkQueue;
+ }
+
+ /**
+ * @return empty list if file does not exist
+ * @throws IOException on other error
+ */
+ public Iterator<String> getCollectorURLs() throws IOException {
+ String chukwaHome = System.getenv("CHUKWA_HOME");
+ if (chukwaHome == null) {
+ chukwaHome = ".";
+ }
+
+ if (!chukwaHome.endsWith("/")) {
+ chukwaHome = chukwaHome + File.separator;
+ }
+ log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
+
+ String chukwaConf = System.getenv("CHUKWA_CONF_DIR");
+ if (chukwaConf == null) {
+ chukwaConf = chukwaHome + "conf" + File.separator;
+ }
+
+ log.info("Config - System.getenv(\"chukwaConf\"): [" + chukwaConf + "]");
+
+ log.info("setting up collectors file: " + chukwaConf + File.separator
+ + COLLECTORS_FILENAME);
+ File collectors = new File(chukwaConf + File.separator + "collectors");
+ try {
+ return new RetryListOfCollectors(collectors, 1000 * 15);// time is ms
+ // between tries
+ } catch (java.io.IOException e) {
+ log.error("failed to read collectors file: ", e);
+ throw e;
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Wed Mar 11 22:39:26 2009
@@ -17,76 +17,82 @@
*/
package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
/**
- * An adaptor is a component that runs within the Local Agent, producing
- * chunks of monitoring data.
+ * An adaptor is a component that runs within the Local Agent, producing chunks
+ * of monitoring data.
*
- * An adaptor can, but need not, have an associated thread. If an adaptor
- * lacks a thread, it needs to arrange some mechanism to periodically get control
- * and send reports such as a callback somewhere.
+ * An adaptor can, but need not, have an associated thread. If an adaptor lacks
+ * a thread, it needs to arrange some mechanism to periodically get control and
+ * send reports such as a callback somewhere.
*
- * Adaptors must be able to stop and resume without losing data, using
- * a byte offset in the stream.
+ * Adaptors must be able to stop and resume without losing data, using a byte
+ * offset in the stream.
*
* If an adaptor crashes at byte offset n, and is restarted at byte offset k,
- * with k < n, it is allowed to send different values for bytes k through n the
- * second time around. However, the stream must still be parseable, assuming that
- * bytes 0-k come from the first run,and bytes k - n come from the second.
+ * with k < n, it is allowed to send different values for bytes k through n the
+ * second time around. However, the stream must still be parseable, assuming
+ * that bytes 0-k come from the first run,and bytes k - n come from the second.
*/
-public interface Adaptor
-{
+public interface Adaptor {
/**
* Start this adaptor
+ *
* @param type the application type, who is starting this adaptor
* @param status the status string to use for configuration.
* @param offset the stream offset of the first byte sent by this adaptor
* @throws AdaptorException
*/
- public void start(long adaptorID, String type, String status, long offset, ChunkReceiver dest) throws AdaptorException;
-
- /**
- * Return the adaptor's state
- * Should not include class name, datatype or byte offset, which are written by caller.
- * @return the adaptor state as a string
- * @throws AdaptorException
- */
- public String getCurrentStatus() throws AdaptorException;
- public String getType();
-
- /**
- * Return the stream name
- * @return Stream name as a string
- */
- public String getStreamName();
- /**
- * Signals this adaptor to come to an orderly stop.
- * The adaptor ought to push out all the data it can
- * before exiting.
- *
- * This method is synchronous:
- * In other words, after shutdown() returns, no new data should be written.
- *
- * @return the logical offset at which the adaptor stops
- * @throws AdaptorException
- */
- public long shutdown() throws AdaptorException;
-
- /**
- * Signals this adaptor to come to an abrupt stop, as quickly as it can.
- * The use case here is "Whups, I didn't mean to start that adaptor tailing
- * a gigabyte file, stop it now".
- *
- * Adaptors might need to do something nontrivial here, e.g., in the case in which
- * they have registered periodic timer interrupts, or use a shared worker thread
- * from which they need to disengage.
- *
- * This method is synchronous:
- * In other words, after shutdown() returns, no new data should be written.
- *
- * @throws AdaptorException
- */
- public void hardStop() throws AdaptorException;
+ public void start(long adaptorID, String type, String status, long offset,
+ ChunkReceiver dest) throws AdaptorException;
+
+ /**
+ * Return the adaptor's state Should not include class name, datatype or byte
+ * offset, which are written by caller.
+ *
+ * @return the adaptor state as a string
+ * @throws AdaptorException
+ */
+ public String getCurrentStatus() throws AdaptorException;
+
+ public String getType();
+
+ /**
+ * Return the stream name
+ *
+ * @return Stream name as a string
+ */
+ public String getStreamName();
+
+ /**
+ * Signals this adaptor to come to an orderly stop. The adaptor ought to push
+ * out all the data it can before exiting.
+ *
+ * This method is synchronous: In other words, after shutdown() returns, no
+ * new data should be written.
+ *
+ * @return the logical offset at which the adaptor stops
+ * @throws AdaptorException
+ */
+ public long shutdown() throws AdaptorException;
+
+ /**
+ * Signals this adaptor to come to an abrupt stop, as quickly as it can. The
+ * use case here is "Whups, I didn't mean to start that adaptor tailing a
+ * gigabyte file, stop it now".
+ *
+ * Adaptors might need to do something nontrivial here, e.g., in the case in
+ * which they have registered periodic timer interrupts, or use a shared
+ * worker thread from which they need to disengage.
+ *
+ * This method is synchronous: In other words, after shutdown() returns, no
+ * new data should be written.
+ *
+ * @throws AdaptorException
+ */
+ public void hardStop() throws AdaptorException;
}
\ No newline at end of file
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java Wed Mar 11 22:39:26 2009
@@ -18,29 +18,25 @@
package org.apache.hadoop.chukwa.datacollection.adaptor;
-public class AdaptorException extends Exception
-{
- private static final long serialVersionUID = -8490279345367308690L;
+public class AdaptorException extends Exception {
- public AdaptorException()
- {
- super();
- }
-
- public AdaptorException(String arg0, Throwable arg1)
- {
- super(arg0, arg1);
- }
-
- public AdaptorException(String arg0)
- {
- super(arg0);
- }
-
- public AdaptorException(Throwable arg0)
- {
- super(arg0);
- }
+ private static final long serialVersionUID = -8490279345367308690L;
+
+ public AdaptorException() {
+ super();
+ }
+
+ public AdaptorException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+
+ public AdaptorException(String arg0) {
+ super(arg0);
+ }
+
+ public AdaptorException(Throwable arg0) {
+ super(arg0);
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java Wed Mar 11 22:39:26 2009
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
@@ -27,14 +29,14 @@
import java.util.*;
/**
- * Runs a command inside chukwa. Takes as params the interval
- * in seconds at which to run the command, and the path and args
- * to execute.
+ * Runs a command inside chukwa. Takes as params the interval in seconds at
+ * which to run the command, and the path and args to execute.
*
* Interval is optional, and defaults to 5 seconds.
*
- * Example usage:
- * add org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor Ps 2 /bin/ps aux 0
+ * Example usage: add
+ * org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor Ps 2 /bin/ps aux
+ * 0
*
*/
public class ExecAdaptor extends ExecPlugin implements Adaptor {
@@ -42,77 +44,78 @@
static final boolean FAKE_LOG4J_HEADER = true;
static final boolean SPLIT_LINES = false;
protected long adaptorID = 0;
- static Logger log =Logger.getLogger(ExecAdaptor.class);
-
+ static Logger log = Logger.getLogger(ExecAdaptor.class);
+
class RunToolTask extends TimerTask {
public void run() {
JSONObject o = execute();
try {
-
- if(o.getInt("status") == statusKO)
+
+ if (o.getInt("status") == statusKO)
hardStop();
-
- //FIXME: downstream customers would like timestamps here.
- //Doing that efficiently probably means cutting out all the
- //excess buffer copies here, and appending into an OutputBuffer.
+
+ // FIXME: downstream customers would like timestamps here.
+ // Doing that efficiently probably means cutting out all the
+ // excess buffer copies here, and appending into an OutputBuffer.
byte[] data;
- if(FAKE_LOG4J_HEADER) {
+ if (FAKE_LOG4J_HEADER) {
StringBuilder result = new StringBuilder();
ISO8601DateFormat dateFormat = new org.apache.log4j.helpers.ISO8601DateFormat();
result.append(dateFormat.format(new java.util.Date()));
result.append(" INFO org.apache.hadoop.chukwa.");
result.append(type);
- result.append(": ");
+ result.append(": ");
result.append(o.getString("stdout"));
data = result.toString().getBytes();
} else {
String stdout = o.getString("stdout");
data = stdout.getBytes();
}
-
+
sendOffset += data.length;
- ChunkImpl c = new ChunkImpl(ExecAdaptor.this.type,
- "results from " + cmd, sendOffset , data, ExecAdaptor.this);
-
- if(SPLIT_LINES) {
- ArrayList<Integer> carriageReturns = new ArrayList<Integer>();
- for(int i = 0; i < data.length ; ++i)
- if(data[i] == '\n')
+ ChunkImpl c = new ChunkImpl(ExecAdaptor.this.type, "results from "
+ + cmd, sendOffset, data, ExecAdaptor.this);
+
+ if (SPLIT_LINES) {
+ ArrayList<Integer> carriageReturns = new ArrayList<Integer>();
+ for (int i = 0; i < data.length; ++i)
+ if (data[i] == '\n')
carriageReturns.add(i);
-
+
c.setRecordOffsets(carriageReturns);
- } //else we get default one record
-
+ } // else we get default one record
+
dest.add(c);
- } catch(JSONException e ) {
- //FIXME: log this somewhere
- } catch (InterruptedException e) {
+ } catch (JSONException e) {
+ // FIXME: log this somewhere
+ } catch (InterruptedException e) {
// TODO Auto-generated catch block
- }catch(AdaptorException e ) {
- //FIXME: log this somewhere
+ } catch (AdaptorException e) {
+ // FIXME: log this somewhere
}
}
};
-
+
String cmd;
String type;
ChunkReceiver dest;
final java.util.Timer timer;
long period = 5 * 1000;
volatile long sendOffset = 0;
-
+
public ExecAdaptor() {
timer = new java.util.Timer();
}
-
+
@Override
public String getCurrentStatus() throws AdaptorException {
return type + " " + period + " " + cmd + " " + sendOffset;
}
public String getStreamName() {
- return cmd;
+ return cmd;
}
+
@Override
public void hardStop() throws AdaptorException {
super.stop();
@@ -120,37 +123,37 @@
}
@Override
- public long shutdown() throws AdaptorException {
+ public long shutdown() throws AdaptorException {
try {
timer.cancel();
- super.waitFor(); //wait for last data to get pushed out
- } catch(InterruptedException e) {
- return sendOffset;
+ super.waitFor(); // wait for last data to get pushed out
+ } catch (InterruptedException e) {
+ return sendOffset;
}
return sendOffset;
}
@Override
- public void start(long adaptorID, String type, String status, long offset, ChunkReceiver dest)
- throws AdaptorException {
-
+ public void start(long adaptorID, String type, String status, long offset,
+ ChunkReceiver dest) throws AdaptorException {
+
int spOffset = status.indexOf(' ');
- if(spOffset > 0) {
- try {
- period = Integer.parseInt(status.substring(0, spOffset));
- cmd = status.substring(spOffset + 1);
- } catch(NumberFormatException e) {
- log.warn("ExecAdaptor: sample interval " + status.substring(0, spOffset) + " can't be parsed");
- cmd = status;
+ if (spOffset > 0) {
+ try {
+ period = Integer.parseInt(status.substring(0, spOffset));
+ cmd = status.substring(spOffset + 1);
+ } catch (NumberFormatException e) {
+ log.warn("ExecAdaptor: sample interval "
+ + status.substring(0, spOffset) + " can't be parsed");
+ cmd = status;
}
- }
- else
+ } else
cmd = status;
this.adaptorID = adaptorID;
this.type = type;
this.dest = dest;
this.sendOffset = offset;
-
+
TimerTask exec = new RunToolTask();
timer.schedule(exec, 0L, period);
}
@@ -159,7 +162,6 @@
public String getCmde() {
return cmd;
}
-
@Override
public String getType() {
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java Wed Mar 11 22:39:26 2009
@@ -18,114 +18,118 @@
package org.apache.hadoop.chukwa.datacollection.adaptor;
+
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
-import org.apache.hadoop.chukwa.util.RecordConstants;
+import org.apache.hadoop.chukwa.util.RecordConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
-
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
-
/**
* File Adaptor push small size file in one chunk to collector
*/
-public class FileAdaptor implements Adaptor
-{
+public class FileAdaptor implements Adaptor {
+
+ static Logger log;
- static Logger log;
+ protected static Configuration conf = null;
+ private int attempts = 0;
+
+ File toWatch;
+ /**
+ * next PHYSICAL offset to read
+ */
+ protected long fileReadOffset;
+ protected String type;
+ private ChunkReceiver dest;
+ protected RandomAccessFile reader = null;
+ protected long adaptorID;
+
+ /**
+ * The logical offset of the first byte of the file
+ */
+ private long offsetOfFirstByte = 0;
+
+ static {
+ log = Logger.getLogger(FileAdaptor.class);
+ }
+
+ public void start(long adaptorID, String type, String params, long bytes,
+ ChunkReceiver dest) {
+ // in this case params = filename
+ log.info("adaptor id: " + adaptorID + " started file adaptor on file "
+ + params);
+ this.adaptorID = adaptorID;
+ this.type = type;
+ this.dest = dest;
+ this.attempts = 0;
+
+ String[] words = params.split(" ");
+ if (words.length > 1) {
+ offsetOfFirstByte = Long.parseLong(words[0]);
+ toWatch = new File(params.substring(words[0].length() + 1));
+ } else {
+ toWatch = new File(params);
+ }
+ try {
+ reader = new RandomAccessFile(toWatch, "r");
+ long bufSize = toWatch.length();
+ byte[] buf = new byte[(int) bufSize];
+ reader.read(buf);
+ long fileTime = toWatch.lastModified();
+ int bytesUsed = extractRecords(dest, 0, buf, fileTime);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ ChukwaAgent agent = ChukwaAgent.getAgent();
+ if (agent != null) {
+ agent.stopAdaptor(adaptorID, false);
+ } else {
+ log.info("Agent is null, running in default mode");
+ }
+ this.fileReadOffset = bytes;
+ }
+
+ /**
+ * Do one last tail, and then stop
+ *
+ * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
+ */
+ public long shutdown() throws AdaptorException {
+ hardStop();
+ return fileReadOffset + offsetOfFirstByte;
+ }
+
+ /**
+ * Stop tailing the file, effective immediately.
+ */
+ public void hardStop() throws AdaptorException {
+ }
+
+ public String getStreamName() {
+ return toWatch.getPath();
+ }
- protected static Configuration conf = null;
- private int attempts = 0;
-
- File toWatch;
- /**
- * next PHYSICAL offset to read
- */
- protected long fileReadOffset;
- protected String type;
- private ChunkReceiver dest;
- protected RandomAccessFile reader = null;
- protected long adaptorID;
-
- /**
- * The logical offset of the first byte of the file
- */
- private long offsetOfFirstByte = 0;
-
- static {
- log =Logger.getLogger(FileAdaptor.class);
- }
-
- public void start(long adaptorID, String type, String params, long bytes, ChunkReceiver dest) {
- //in this case params = filename
- log.info("adaptor id: "+adaptorID+" started file adaptor on file " + params);
- this.adaptorID = adaptorID;
- this.type = type;
- this.dest = dest;
- this.attempts = 0;
-
- String[] words = params.split(" ");
- if(words.length > 1) {
- offsetOfFirstByte = Long.parseLong(words[0]);
- toWatch = new File(params.substring(words[0].length() + 1));
- } else {
- toWatch = new File(params);
- }
- try {
- reader = new RandomAccessFile(toWatch, "r");
- long bufSize = toWatch.length();
- byte[] buf = new byte[(int) bufSize];
- reader.read(buf);
- long fileTime = toWatch.lastModified();
- int bytesUsed = extractRecords(dest, 0, buf, fileTime);
- } catch(Exception e) {
- e.printStackTrace();
- }
- ChukwaAgent agent = ChukwaAgent.getAgent();
- if (agent != null) {
- agent.stopAdaptor(adaptorID, false);
- } else {
- log.info("Agent is null, running in default mode");
- }
- this.fileReadOffset= bytes;
- }
-
- /**
- * Do one last tail, and then stop
- * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
- */
- public long shutdown() throws AdaptorException {
- hardStop();
- return fileReadOffset + offsetOfFirstByte;
- }
- /**
- * Stop tailing the file, effective immediately.
- */
- public void hardStop() throws AdaptorException {
- }
-
- public String getStreamName() {
- return toWatch.getPath();
- }
-
/**
* Extract records from a byte sequence
+ *
* @param eq the queue to stick the new chunk[s] in
* @param buffOffsetInFile the byte offset in the stream at which buf[] begins
* @param buf the byte buffer to extract records from
* @return the number of bytes processed
* @throws InterruptedException
*/
- protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf, long fileTime) throws InterruptedException {
- ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(), buffOffsetInFile + buf.length,
- buf, this);
+ protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+ byte[] buf, long fileTime) throws InterruptedException {
+ ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
+ buffOffsetInFile + buf.length, buf, this);
String tags = chunk.getTags();
- chunk.setTags(tags+" time=\""+fileTime+"\"");
+ chunk.setTags(tags + " time=\"" + fileTime + "\"");
eq.add(chunk);
return buf.length;
}
@@ -137,7 +141,8 @@
@Override
public String getCurrentStatus() throws AdaptorException {
- return type.trim() + " " + offsetOfFirstByte+ " " + toWatch.getPath() + " " + fileReadOffset;
+ return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath()
+ + " " + fileReadOffset;
}
-
+
}
\ No newline at end of file