You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [2/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Consolidator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Consolidator.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Consolidator.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/Consolidator.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.chukwa.database;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
 import org.apache.hadoop.chukwa.util.ExceptionUtil;
 import org.apache.hadoop.chukwa.util.PidFile;
-
 import java.sql.SQLException;
 import java.sql.ResultSet;
 import java.util.Calendar;
@@ -34,223 +34,233 @@
 import java.text.SimpleDateFormat;
 
 public class Consolidator extends Thread {
-	private DatabaseConfig dbc = new DatabaseConfig();
+  private DatabaseConfig dbc = new DatabaseConfig();
 
-	private static Log log = LogFactory.getLog(Consolidator.class);
-	private String table = null;
-	private int[] intervals;
-    private static PidFile loader=null;
-
-	public Consolidator(String table, String intervalString) {
-		super(table);
-		try {
-			int i=0;
-			String[] temp = intervalString.split("\\,");
-			intervals = new int[temp.length];
-			for(String s: temp) {
-			    intervals[i]=Integer.parseInt(s);
-			    i++;
-			}
-			this.table = table;
-		} catch (NumberFormatException ex) {
-			log.error("Unable to parse summary interval");
-		}		
-	}
-	public void run() {
-		ResultSet rs = null;
-		String[] columns;
-		int[] columnsType;
-		String groupBy = "";
-        
-		for(int interval : intervals) {
-			// Start reducing from beginning of time;
-			Calendar aYearAgo = Calendar.getInstance();
-			aYearAgo.set(2008, 1, 1, 0, 0, 0);
-
-			long start = aYearAgo.getTimeInMillis();  //starting from 2008/01/01
-			long end = start + (interval*60000);
-			log.debug("start time: "+start);
-			log.debug("end time: "+end);
-			Calendar now = Calendar.getInstance();
-			String cluster = System.getProperty("CLUSTER");
-			if(cluster==null) {
-				cluster="unknown";
-			}
-			DatabaseWriter db = new DatabaseWriter(cluster);
-			String fields = null;
-			String dateclause = null;
-			boolean emptyPrimeKey = false;
-			log.info("Consolidate for "+interval+" minutes interval.");
-			
-			String[] tmpTable = dbc.findTableName(this.table, start, end);
-			String table = tmpTable[0];
-			String sumTable="";
-			if(interval==5) {
-				long partition=now.getTime().getTime() / DatabaseConfig.WEEK;
-				StringBuilder stringBuilder = new StringBuilder();
-				stringBuilder.append(this.table);
-				stringBuilder.append("_");
-				stringBuilder.append(partition);
-				stringBuilder.append("_week");
-				table=stringBuilder.toString();
-				long partition2=now.getTime().getTime() / DatabaseConfig.MONTH;
-				sumTable =this.table+"_"+partition2+"_month";
-			} else if(interval==30) {
-				long partition=now.getTime().getTime() / DatabaseConfig.MONTH;
-				table=this.table+"_"+partition+"_month";				
-				long partition2=now.getTime().getTime() / DatabaseConfig.QUARTER;
-				sumTable =this.table+"_"+partition2+"_month";
-			} else if(interval==180) {
-				long partition=now.getTime().getTime() / DatabaseConfig.QUARTER;
-				table=this.table+"_"+partition+"_quarter";
-				long partition2=now.getTime().getTime() / DatabaseConfig.YEAR;
-				sumTable =this.table+"_"+partition2+"_month";
-			} else if(interval==720) {
-				long partition=now.getTime().getTime() / DatabaseConfig.YEAR;
-				table=this.table+"_"+partition+"_year";
-				long partition2=now.getTime().getTime() / DatabaseConfig.DECADE;
-				sumTable =this.table+"_"+partition2+"_month";
-			}
-			// Find the most recent entry
-			try {
-			    String query = "select * from "+sumTable+" order by timestamp desc limit 1";
-	            log.debug("Query: "+query);
-	            rs = db.query(query);
-	            if(rs==null) {
-	          	    throw new SQLException("Table is undefined.");
-	            }
-	            ResultSetMetaData rmeta = rs.getMetaData();
-	            boolean empty=true;
-	            if(rs.next()) {
-	                for(int i=1;i<=rmeta.getColumnCount();i++) {
-		                if(rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
-		            	    start = rs.getTimestamp(i).getTime();
-		                }
-	                }
-	                empty=false;
-	            }
-	            if(empty) {
-	              	throw new SQLException("Table is empty.");
-	            }
-                end = start + (interval*60000);
-			} catch (SQLException ex) {
-			    try {
-				    String query = "select * from "+table+" order by timestamp limit 1";
-		            log.debug("Query: "+query);
-	                rs = db.query(query);
-	                if(rs.next()) {
-	    	            ResultSetMetaData rmeta = rs.getMetaData();
-	    	            for(int i=1;i<=rmeta.getColumnCount();i++) {
-	    	                if(rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
-	    	                	start = rs.getTimestamp(i).getTime();
-	    	                }
-	    	            }
-				    }
-                    end = start + (interval*60000);
-				} catch(SQLException ex2) {
-				    log.error("Unable to determine starting point in table: "+this.table);
-					log.error("SQL Error:"+ExceptionUtil.getStackTrace(ex2));
-					return;
-				}
-			}
-			try {
-                ResultSetMetaData rmeta = rs.getMetaData();
-                int col = rmeta.getColumnCount();
-                columns = new String[col];
-                columnsType = new int[col];
-                for(int i=1;i<=col;i++) {
-            	    columns[i-1]=rmeta.getColumnName(i);
-              	    columnsType[i-1]=rmeta.getColumnType(i);
-                }
+  private static Log log = LogFactory.getLog(Consolidator.class);
+  private String table = null;
+  private int[] intervals;
+  private static PidFile loader = null;
+
+  public Consolidator(String table, String intervalString) {
+    super(table);
+    try {
+      int i = 0;
+      String[] temp = intervalString.split("\\,");
+      intervals = new int[temp.length];
+      for (String s : temp) {
+        intervals[i] = Integer.parseInt(s);
+        i++;
+      }
+      this.table = table;
+    } catch (NumberFormatException ex) {
+      log.error("Unable to parse summary interval");
+    }
+  }
 
-		        for(int i=0;i<columns.length;i++) {
-		    	    if(i==0) {
-		    		    fields=columns[i];
-	    	            if(columnsType[i]==java.sql.Types.VARCHAR) {
-	    	            	if(groupBy.equals("")) {
-	    	            	    groupBy = " group by "+columns[i];
-	    	            	} else {
-		    	            	groupBy = groupBy+","+columns[i];	    	            		
-	    	            	}
-	    	            }
-		    	    } else {
-		    		    if(columnsType[i]==java.sql.Types.VARCHAR || columnsType[i]==java.sql.Types.TIMESTAMP) {
-		    	            fields=fields+","+columns[i];
-		    	            if(columnsType[i]==java.sql.Types.VARCHAR) {
-		    	            	if(groupBy.equals("")) {
-		    	            	    groupBy = " group by "+columns[i];
-		    	            	} else {
-		    	            	    groupBy = groupBy+","+columns[i];		    	            		
-		    	            	}
-		    	            }
-		    		    } else {
-		    	            fields=fields+",AVG("+columns[i]+") as "+columns[i];
-		    		    }
-		    	    }
-		        }
-			} catch(SQLException ex) {
-			  	log.error("SQL Error:"+ExceptionUtil.getStackTrace(ex));
-			  	return;
-			}
-            if(groupBy.equals("")) {
-            	emptyPrimeKey = true;
+  public void run() {
+    ResultSet rs = null;
+    String[] columns;
+    int[] columnsType;
+    String groupBy = "";
+
+    for (int interval : intervals) {
+      // Start reducing from beginning of time;
+      Calendar aYearAgo = Calendar.getInstance();
+      aYearAgo.set(2008, 1, 1, 0, 0, 0);
+
+      long start = aYearAgo.getTimeInMillis(); // starting from 2008/01/01
+      long end = start + (interval * 60000);
+      log.debug("start time: " + start);
+      log.debug("end time: " + end);
+      Calendar now = Calendar.getInstance();
+      String cluster = System.getProperty("CLUSTER");
+      if (cluster == null) {
+        cluster = "unknown";
+      }
+      DatabaseWriter db = new DatabaseWriter(cluster);
+      String fields = null;
+      String dateclause = null;
+      boolean emptyPrimeKey = false;
+      log.info("Consolidate for " + interval + " minutes interval.");
+
+      String[] tmpTable = dbc.findTableName(this.table, start, end);
+      String table = tmpTable[0];
+      String sumTable = "";
+      if (interval == 5) {
+        long partition = now.getTime().getTime() / DatabaseConfig.WEEK;
+        StringBuilder stringBuilder = new StringBuilder();
+        stringBuilder.append(this.table);
+        stringBuilder.append("_");
+        stringBuilder.append(partition);
+        stringBuilder.append("_week");
+        table = stringBuilder.toString();
+        long partition2 = now.getTime().getTime() / DatabaseConfig.MONTH;
+        sumTable = this.table + "_" + partition2 + "_month";
+      } else if (interval == 30) {
+        long partition = now.getTime().getTime() / DatabaseConfig.MONTH;
+        table = this.table + "_" + partition + "_month";
+        long partition2 = now.getTime().getTime() / DatabaseConfig.QUARTER;
+        sumTable = this.table + "_" + partition2 + "_month";
+      } else if (interval == 180) {
+        long partition = now.getTime().getTime() / DatabaseConfig.QUARTER;
+        table = this.table + "_" + partition + "_quarter";
+        long partition2 = now.getTime().getTime() / DatabaseConfig.YEAR;
+        sumTable = this.table + "_" + partition2 + "_month";
+      } else if (interval == 720) {
+        long partition = now.getTime().getTime() / DatabaseConfig.YEAR;
+        table = this.table + "_" + partition + "_year";
+        long partition2 = now.getTime().getTime() / DatabaseConfig.DECADE;
+        sumTable = this.table + "_" + partition2 + "_month";
+      }
+      // Find the most recent entry
+      try {
+        String query = "select * from " + sumTable
+            + " order by timestamp desc limit 1";
+        log.debug("Query: " + query);
+        rs = db.query(query);
+        if (rs == null) {
+          throw new SQLException("Table is undefined.");
+        }
+        ResultSetMetaData rmeta = rs.getMetaData();
+        boolean empty = true;
+        if (rs.next()) {
+          for (int i = 1; i <= rmeta.getColumnCount(); i++) {
+            if (rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
+              start = rs.getTimestamp(i).getTime();
             }
-			long previousStart = start;
-			long partition = 0;
-			String timeWindowType="week";
-        	while(end < now.getTimeInMillis()-(interval*2*60000)) {
-			    // Select new data sample for the given intervals
-			    if(interval == 5) {
-			    	timeWindowType="month";
-					partition = start / DatabaseConfig.MONTH;
-			    } else if(interval == 30) {
-			    	timeWindowType="quarter";
-					partition = start / DatabaseConfig.QUARTER;
-			    } else if(interval == 180) {
-			    	timeWindowType="year";
-					partition = start / DatabaseConfig.YEAR;
-			    } else if(interval == 720) {
-			    	timeWindowType="decade";
-					partition = start / DatabaseConfig.DECADE;
-			    }
-	            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-			    String startS = formatter.format(start);
-			    String endS = formatter.format(end);
-			    dateclause = "Timestamp >= '"+startS+"' and Timestamp <= '"+endS+"'";
-			    if(emptyPrimeKey) {
-			    	groupBy = " group by FLOOR(UNIX_TIMESTAMP(TimeStamp)/"+interval*60+")";
-			    }
-				String query = "replace into "+this.table+"_"+partition+"_"+timeWindowType+" (select "+fields+" from "+table+" where "+dateclause+groupBy+")";
-				log.debug(query);
-                db.execute(query);
-        		if(previousStart == start) {
-        			start = start + (interval*60000);
-        			end = start + (interval*60000);
-            		previousStart = start;
-        		}
-        	}
-            db.close();
-		}
-	}
-
-    public static void main(String[] args) {
-        DataConfig mdl = new DataConfig();
-        loader=new PidFile(System.getProperty("CLUSTER")+"Consolidator");
-        HashMap<String, String> tableNames = (HashMap<String, String>) mdl.startWith("consolidator.table.");
+          }
+          empty = false;
+        }
+        if (empty) {
+          throw new SQLException("Table is empty.");
+        }
+        end = start + (interval * 60000);
+      } catch (SQLException ex) {
         try {
-                Iterator<String> ti = (tableNames.keySet()).iterator();
-                while(ti.hasNext()) {
-                        String table = ti.next();
-                String interval=mdl.get(table);
-                table = table.substring(19);
-                        log.info("Summarizing table:"+table);
-                Consolidator dbc = new Consolidator(table, interval);
-                dbc.run();
+          String query = "select * from " + table
+              + " order by timestamp limit 1";
+          log.debug("Query: " + query);
+          rs = db.query(query);
+          if (rs.next()) {
+            ResultSetMetaData rmeta = rs.getMetaData();
+            for (int i = 1; i <= rmeta.getColumnCount(); i++) {
+              if (rmeta.getColumnName(i).toLowerCase().equals("timestamp")) {
+                start = rs.getTimestamp(i).getTime();
+              }
+            }
+          }
+          end = start + (interval * 60000);
+        } catch (SQLException ex2) {
+          log.error("Unable to determine starting point in table: "
+              + this.table);
+          log.error("SQL Error:" + ExceptionUtil.getStackTrace(ex2));
+          return;
+        }
+      }
+      try {
+        ResultSetMetaData rmeta = rs.getMetaData();
+        int col = rmeta.getColumnCount();
+        columns = new String[col];
+        columnsType = new int[col];
+        for (int i = 1; i <= col; i++) {
+          columns[i - 1] = rmeta.getColumnName(i);
+          columnsType[i - 1] = rmeta.getColumnType(i);
+        }
+
+        for (int i = 0; i < columns.length; i++) {
+          if (i == 0) {
+            fields = columns[i];
+            if (columnsType[i] == java.sql.Types.VARCHAR) {
+              if (groupBy.equals("")) {
+                groupBy = " group by " + columns[i];
+              } else {
+                groupBy = groupBy + "," + columns[i];
+              }
+            }
+          } else {
+            if (columnsType[i] == java.sql.Types.VARCHAR
+                || columnsType[i] == java.sql.Types.TIMESTAMP) {
+              fields = fields + "," + columns[i];
+              if (columnsType[i] == java.sql.Types.VARCHAR) {
+                if (groupBy.equals("")) {
+                  groupBy = " group by " + columns[i];
+                } else {
+                  groupBy = groupBy + "," + columns[i];
                 }
-        } catch (NullPointerException e) {
-                log.error("Unable to summarize database.");
-                log.error("Error:"+ExceptionUtil.getStackTrace(e));
+              }
+            } else {
+              fields = fields + ",AVG(" + columns[i] + ") as " + columns[i];
+            }
+          }
+        }
+      } catch (SQLException ex) {
+        log.error("SQL Error:" + ExceptionUtil.getStackTrace(ex));
+        return;
+      }
+      if (groupBy.equals("")) {
+        emptyPrimeKey = true;
+      }
+      long previousStart = start;
+      long partition = 0;
+      String timeWindowType = "week";
+      while (end < now.getTimeInMillis() - (interval * 2 * 60000)) {
+        // Select new data sample for the given intervals
+        if (interval == 5) {
+          timeWindowType = "month";
+          partition = start / DatabaseConfig.MONTH;
+        } else if (interval == 30) {
+          timeWindowType = "quarter";
+          partition = start / DatabaseConfig.QUARTER;
+        } else if (interval == 180) {
+          timeWindowType = "year";
+          partition = start / DatabaseConfig.YEAR;
+        } else if (interval == 720) {
+          timeWindowType = "decade";
+          partition = start / DatabaseConfig.DECADE;
         }
-        loader.clean();
+        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        String startS = formatter.format(start);
+        String endS = formatter.format(end);
+        dateclause = "Timestamp >= '" + startS + "' and Timestamp <= '" + endS
+            + "'";
+        if (emptyPrimeKey) {
+          groupBy = " group by FLOOR(UNIX_TIMESTAMP(TimeStamp)/" + interval
+              * 60 + ")";
+        }
+        String query = "replace into " + this.table + "_" + partition + "_"
+            + timeWindowType + " (select " + fields + " from " + table
+            + " where " + dateclause + groupBy + ")";
+        log.debug(query);
+        db.execute(query);
+        if (previousStart == start) {
+          start = start + (interval * 60000);
+          end = start + (interval * 60000);
+          previousStart = start;
+        }
+      }
+      db.close();
+    }
+  }
+
+  public static void main(String[] args) {
+    DataConfig mdl = new DataConfig();
+    loader = new PidFile(System.getProperty("CLUSTER") + "Consolidator");
+    HashMap<String, String> tableNames = (HashMap<String, String>) mdl
+        .startWith("consolidator.table.");
+    try {
+      Iterator<String> ti = (tableNames.keySet()).iterator();
+      while (ti.hasNext()) {
+        String table = ti.next();
+        String interval = mdl.get(table);
+        table = table.substring(19);
+        log.info("Summarizing table:" + table);
+        Consolidator dbc = new Consolidator(table, interval);
+        dbc.run();
+      }
+    } catch (NullPointerException e) {
+      log.error("Unable to summarize database.");
+      log.error("Error:" + ExceptionUtil.getStackTrace(e));
     }
+    loader.clean();
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DataExpiration.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DataExpiration.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DataExpiration.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DataExpiration.java Wed Mar 11 22:39:26 2009
@@ -18,89 +18,97 @@
 
 package org.apache.hadoop.chukwa.database;
 
+
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
 
 public class DataExpiration {
-	private static DatabaseConfig dbc = null;
-	private static Log log = LogFactory.getLog(DataExpiration.class);		
-	public DataExpiration() {
-    	if(dbc==null) {
-    	    dbc = new DatabaseConfig();
-    	}
+  private static DatabaseConfig dbc = null;
+  private static Log log = LogFactory.getLog(DataExpiration.class);
+
+  public DataExpiration() {
+    if (dbc == null) {
+      dbc = new DatabaseConfig();
+    }
+  }
+
+  public void dropTables(long start, long end) {
+    String cluster = System.getProperty("CLUSTER");
+    if (cluster == null) {
+      cluster = "unknown";
+    }
+    DatabaseWriter dbw = new DatabaseWriter(cluster);
+    try {
+      HashMap<String, String> dbNames = dbc.startWith("report.db.name.");
+      Iterator<String> ki = dbNames.keySet().iterator();
+      while (ki.hasNext()) {
+        String name = ki.next();
+        String tableName = dbNames.get(name);
+        String[] tableList = dbc.findTableName(tableName, start, end);
+        for (String tl : tableList) {
+          log.debug("table name: " + tableList[0]);
+          try {
+            String[] parts = tl.split("_");
+            int partition = Integer.parseInt(parts[parts.length - 2]);
+            String table = "";
+            for (int i = 0; i < parts.length - 2; i++) {
+              if (i != 0) {
+                table = table + "_";
+              }
+              table = table + parts[i];
+            }
+            partition = partition - 3;
+            String dropPartition = "drop table if exists " + table + "_"
+                + partition + "_" + parts[parts.length - 1];
+            dbw.execute(dropPartition);
+            partition--;
+            dropPartition = "drop table if exists " + table + "_" + partition
+                + "_" + parts[parts.length - 1];
+            dbw.execute(dropPartition);
+          } catch (NumberFormatException e) {
+            log
+                .error("Error in parsing table partition number, skipping table:"
+                    + tableList[0]);
+          } catch (ArrayIndexOutOfBoundsException e) {
+            log.debug("Skipping table:" + tableList[0]
+                + ", because it has no partition configuration.");
+          }
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
     }
-	public void dropTables(long start, long end) {
-		String cluster = System.getProperty("CLUSTER");
-		if(cluster==null) {
-			cluster="unknown";
-		}
-		DatabaseWriter dbw = new DatabaseWriter(cluster);
-		try {
-			HashMap<String, String> dbNames = dbc.startWith("report.db.name.");
-			Iterator<String> ki = dbNames.keySet().iterator();
-			while(ki.hasNext()) {
-				String name = ki.next();
-				String tableName = dbNames.get(name);
-				String[] tableList = dbc.findTableName(tableName, start, end);
-				for(String tl : tableList) {
-					log.debug("table name: "+tableList[0]);
-					try {
-						String[] parts = tl.split("_");
-						int partition = Integer.parseInt(parts[parts.length-2]);
-						String table = "";
-						for(int i=0;i<parts.length-2;i++) {
-							if(i!=0) {
-								table=table+"_";
-							}
-							table=table+parts[i];
-						}
-						partition=partition-3;
-						String dropPartition="drop table if exists "+table+"_"+partition+"_"+parts[parts.length-1];
-						dbw.execute(dropPartition);
-						partition--;
-						dropPartition="drop table if exists "+table+"_"+partition+"_"+parts[parts.length-1];
-						dbw.execute(dropPartition);
-					} catch(NumberFormatException e) {
-						log.error("Error in parsing table partition number, skipping table:"+tableList[0]);
-					} catch(ArrayIndexOutOfBoundsException e) {
-						log.debug("Skipping table:"+tableList[0]+", because it has no partition configuration.");
-					}
-				}
-			}
-		} catch(Exception e) {
-			e.printStackTrace();
-		}		
-	}
-	
-	public static void usage() {
-		System.out.println("DataExpiration usage:");
-		System.out.println("java -jar chukwa-core.jar org.apache.hadoop.chukwa.DataExpiration <date> <time window size>");
-		System.out.println("     date format: YYYY-MM-DD");
-		System.out.println("     time window size: 7, 30, 91, 365");		
-	}
-	
-	public static void main(String[] args) {
-		DataExpiration de = new DataExpiration();
-		long now = (new Date()).getTime();
-		long start = now;
-		long end = now;
-		if(args.length==2) {
-			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
-			try {
-				start = sdf.parse(args[0]).getTime();				
-				end = start + (Long.parseLong(args[1])*1440*60*1000L);
-				de.dropTables(start, end);				
-			} catch(Exception e) {
-				usage();
-			}
-		} else {
-			usage();
-		}
+  }
+
+  public static void usage() {
+    System.out.println("DataExpiration usage:");
+    System.out
+        .println("java -jar chukwa-core.jar org.apache.hadoop.chukwa.DataExpiration <date> <time window size>");
+    System.out.println("     date format: YYYY-MM-DD");
+    System.out.println("     time window size: 7, 30, 91, 365");
+  }
+
+  public static void main(String[] args) {
+    DataExpiration de = new DataExpiration();
+    long now = (new Date()).getTime();
+    long start = now;
+    long end = now;
+    if (args.length == 2) {
+      SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+      try {
+        start = sdf.parse(args[0]).getTime();
+        end = start + (Long.parseLong(args[1]) * 1440 * 60 * 1000L);
+        de.dropTables(start, end);
+      } catch (Exception e) {
+        usage();
+      }
+    } else {
+      usage();
     }
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/DatabaseConfig.java Wed Mar 11 22:39:26 2009
@@ -18,227 +18,237 @@
 
 package org.apache.hadoop.chukwa.database;
 
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import java.util.*;
 
 public class DatabaseConfig {
-    private Configuration config = null;
-	public final static long CENTURY=36500*24*60*60*1000L;
-    public final static long DECADE=3650*24*60*60*1000L;
-    public final static long YEAR=365*24*60*60*1000L;
-    public final static long QUARTER=91250*24*60*60L;
-    public final static long MONTH=30*24*60*60*1000L;
-	public final static long WEEK=7*24*60*60*1000L;
-	public final static long DAY=24*60*60*1000L;
-
-    public DatabaseConfig(String path) {
-        Path fileResource = new Path(path);
-        config = new Configuration();
-        config.addResource(fileResource);
-    }
-    public DatabaseConfig() {
-        Path fileResource = new Path(System.getenv("DATACONFIG"));
-        config = new Configuration();
-        config.addResource(fileResource);
-    }
-
-    public String get(String key) {
-        return config.get(key);
-    }
-    public void put(String key, String value) {
-        this.config.set(key, value);
-    }
-    public Iterator<?> iterator() {
-        return this.config.iterator();
-    }
-    public HashMap<String, String> startWith(String key) {
-        HashMap<String, String> transformer = new HashMap<String, String>();
-        Iterator<?> entries = config.iterator();
-        while(entries.hasNext()) {
-           String entry = entries.next().toString();
-           if(entry.startsWith(key)) {
-               String[] metrics = entry.split("=");
-               transformer.put(metrics[0],metrics[1]);
-           }
+  private Configuration config = null;
+  public final static long CENTURY = 36500 * 24 * 60 * 60 * 1000L;
+  public final static long DECADE = 3650 * 24 * 60 * 60 * 1000L;
+  public final static long YEAR = 365 * 24 * 60 * 60 * 1000L;
+  public final static long QUARTER = 91250 * 24 * 60 * 60L;
+  public final static long MONTH = 30 * 24 * 60 * 60 * 1000L;
+  public final static long WEEK = 7 * 24 * 60 * 60 * 1000L;
+  public final static long DAY = 24 * 60 * 60 * 1000L;
+
+  public DatabaseConfig(String path) {
+    Path fileResource = new Path(path);
+    config = new Configuration();
+    config.addResource(fileResource);
+  }
+
+  public DatabaseConfig() {
+    Path fileResource = new Path(System.getenv("DATACONFIG"));
+    config = new Configuration();
+    config.addResource(fileResource);
+  }
+
+  public String get(String key) {
+    return config.get(key);
+  }
+
+  public void put(String key, String value) {
+    this.config.set(key, value);
+  }
+
+  public Iterator<?> iterator() {
+    return this.config.iterator();
+  }
+
+  public HashMap<String, String> startWith(String key) {
+    HashMap<String, String> transformer = new HashMap<String, String>();
+    Iterator<?> entries = config.iterator();
+    while (entries.hasNext()) {
+      String entry = entries.next().toString();
+      if (entry.startsWith(key)) {
+        String[] metrics = entry.split("=");
+        transformer.put(metrics[0], metrics[1]);
+      }
+    }
+    return transformer;
+  }
+
+  public String[] findTableName(String tableName, long start, long end) {
+    String[] tableNames = null;
+    String tableType = "_week";
+    long now = (new Date()).getTime();
+    long timeWindow = end - start;
+    long partitionSize = WEEK;
+    boolean fallback = true;
+
+    if (config.get("consolidator.table." + tableName) == null) {
+      tableNames = new String[1];
+      tableNames[0] = tableName;
+      return tableNames;
+    }
+
+    if (timeWindow <= 0) {
+      timeWindow = 1;
+    }
+    if (timeWindow > DECADE) {
+      tableType = "_century";
+      partitionSize = CENTURY;
+    } else if (timeWindow > YEAR) {
+      tableType = "_decade";
+      partitionSize = DECADE;
+    } else if (timeWindow > QUARTER) {
+      tableType = "_year";
+      partitionSize = YEAR;
+    } else if (timeWindow > MONTH) {
+      tableType = "_quarter";
+      partitionSize = QUARTER;
+    } else if (timeWindow > WEEK) {
+      tableType = "_month";
+      partitionSize = MONTH;
+    } else {
+      tableType = "_week";
+      partitionSize = WEEK;
+    }
+
+    long currentPartition = now / partitionSize;
+    long startPartition = start / partitionSize;
+    long endPartition = end / partitionSize;
+    while (fallback && partitionSize != CENTURY * 100) {
+      // Check if the starting date is in the far distance from current time. If
+      // it is, use down sampled data.
+      if (startPartition + 2 < currentPartition) {
+        fallback = true;
+        if (partitionSize == DAY) {
+          tableType = "_week";
+          partitionSize = WEEK;
+        } else if (partitionSize == WEEK) {
+          tableType = "_month";
+          partitionSize = MONTH;
+        } else if (partitionSize == MONTH) {
+          tableType = "_year";
+          partitionSize = YEAR;
+        } else if (partitionSize == YEAR) {
+          tableType = "_decade";
+          partitionSize = DECADE;
+        } else if (partitionSize == DECADE) {
+          tableType = "_century";
+          partitionSize = CENTURY;
+        } else {
+          partitionSize = 100 * CENTURY;
+        }
+        currentPartition = now / partitionSize;
+        startPartition = start / partitionSize;
+        endPartition = end / partitionSize;
+      } else {
+        fallback = false;
+      }
+    }
+
+    if (startPartition != endPartition) {
+      int delta = (int) (endPartition - startPartition);
+      tableNames = new String[delta + 1];
+      for (int i = 0; i <= delta; i++) {
+        long partition = startPartition + (long) i;
+        tableNames[i] = tableName + "_" + partition + tableType;
+      }
+    } else {
+      tableNames = new String[1];
+      tableNames[0] = tableName + "_" + startPartition + tableType;
+    }
+    return tableNames;
+  }
+
+  public String[] findTableNameForCharts(String tableName, long start, long end) {
+    String[] tableNames = null;
+    String tableType = "_week";
+    long now = (new Date()).getTime();
+    long timeWindow = end - start;
+    if (timeWindow > 60 * 60 * 1000) {
+      timeWindow = timeWindow + 1;
+    }
+    long partitionSize = WEEK;
+    boolean fallback = true;
+
+    if (config.get("consolidator.table." + tableName) == null) {
+      tableNames = new String[1];
+      tableNames[0] = tableName;
+      return tableNames;
+    }
+
+    if (timeWindow <= 0) {
+      timeWindow = 1;
+    }
+    if (timeWindow > DECADE) {
+      tableType = "_decade";
+      partitionSize = CENTURY;
+    } else if (timeWindow > YEAR) {
+      tableType = "_decade";
+      partitionSize = CENTURY;
+    } else if (timeWindow > QUARTER) {
+      tableType = "_decade";
+      partitionSize = DECADE;
+    } else if (timeWindow > MONTH) {
+      tableType = "_year";
+      partitionSize = YEAR;
+    } else if (timeWindow > WEEK) {
+      tableType = "_quarter";
+      partitionSize = QUARTER;
+    } else if (timeWindow > DAY) {
+      tableType = "_month";
+      partitionSize = MONTH;
+    } else {
+      tableType = "_week";
+      partitionSize = WEEK;
+    }
+
+    long currentPartition = now / partitionSize;
+    long startPartition = start / partitionSize;
+    long endPartition = end / partitionSize;
+    while (fallback && partitionSize != DECADE * 100) {
+      // Check if the starting date is in the far distance from current time. If
+      // it is, use down sampled data.
+      if (startPartition + 2 < currentPartition) {
+        fallback = true;
+        if (partitionSize == DAY) {
+          tableType = "_month";
+          partitionSize = MONTH;
+        } else if (partitionSize == WEEK) {
+          tableType = "_quarter";
+          partitionSize = QUARTER;
+        } else if (partitionSize == MONTH) {
+          tableType = "_year";
+          partitionSize = YEAR;
+        } else if (partitionSize == YEAR) {
+          tableType = "_decade";
+          partitionSize = DECADE;
+        } else {
+          partitionSize = CENTURY;
         }
-        return transformer;
-    }    
-    public String[] findTableName(String tableName, long start, long end) {
-    	String[] tableNames = null;
-    	String tableType = "_week";
-		long now = (new Date()).getTime();
-		long timeWindow = end - start;
-		long partitionSize=WEEK;
-		boolean fallback=true;
-		
-		if(config.get("consolidator.table."+tableName)==null) {
-			tableNames = new String[1];
-			tableNames[0]=tableName;
-			return tableNames;
-		}
-		
-		if(timeWindow<=0) {
-			timeWindow=1;			
-		}
-		if(timeWindow > DECADE) {
-			tableType = "_century";
-			partitionSize=CENTURY;
-		} else if(timeWindow > YEAR) {
-			tableType = "_decade";
-			partitionSize=DECADE;
-		} else if(timeWindow > QUARTER) {
-			tableType = "_year";
-			partitionSize=YEAR;			
-		} else if(timeWindow > MONTH) {
-			tableType = "_quarter";
-			partitionSize=QUARTER;
-		} else if(timeWindow > WEEK) {
-			tableType = "_month";
-			partitionSize=MONTH;
-		} else {
-			tableType = "_week";
-			partitionSize=WEEK;
-		}
-
-		long currentPartition = now / partitionSize;
-		long startPartition = start / partitionSize;
-		long endPartition = end / partitionSize;
-		while(fallback && partitionSize!=CENTURY*100) {
-			// Check if the starting date is in the far distance from current time.  If it is, use down sampled data.
-			if(startPartition + 2 < currentPartition) {
-				fallback=true;
-			    if(partitionSize==DAY) {
-				    tableType = "_week";
-				    partitionSize=WEEK;
-			    } else if(partitionSize==WEEK) {
-				    tableType = "_month";
-				    partitionSize=MONTH;
-			    } else if(partitionSize==MONTH) {
-				    tableType = "_year";
-				    partitionSize=YEAR;
-			    } else if(partitionSize==YEAR) {
-					tableType = "_decade";
-					partitionSize=DECADE;				
-				} else if(partitionSize==DECADE) {
-					tableType = "_century";
-					partitionSize=CENTURY;
-				} else {
-					partitionSize=100*CENTURY;
-				}
-				currentPartition = now / partitionSize;
-				startPartition = start / partitionSize;
-				endPartition = end / partitionSize;
-			} else {
-				fallback=false;
-			}
-		}
-
-		if(startPartition!=endPartition) {
-			int delta = (int) (endPartition-startPartition);
-			tableNames=new String[delta+1];
-			for(int i=0;i<=delta;i++) {
-				long partition = startPartition+(long)i;
-				tableNames[i]=tableName+"_"+partition+tableType;
-			}
-		} else {
-			tableNames=new String[1];
-			tableNames[0]=tableName+"_"+startPartition+tableType;
-		}
-    	return tableNames;
-    }
-    public String[] findTableNameForCharts(String tableName, long start, long end) {
-    	String[] tableNames = null;
-    	String tableType = "_week";
-		long now = (new Date()).getTime();
-		long timeWindow = end - start;
-		if(timeWindow>60*60*1000) {
-		    timeWindow = timeWindow + 1;
-		}
-		long partitionSize=WEEK;
-		boolean fallback=true;
-		
-		if(config.get("consolidator.table."+tableName)==null) {
-			tableNames = new String[1];
-			tableNames[0]=tableName;
-			return tableNames;
-		}
-		
-		if(timeWindow<=0) {
-			timeWindow=1;			
-		}
-		if(timeWindow > DECADE) {
-			tableType = "_decade";
-			partitionSize=CENTURY;			
-		} else if(timeWindow > YEAR) {
-			tableType = "_decade";
-			partitionSize=CENTURY;
-		} else if(timeWindow > QUARTER) {
-			tableType = "_decade";
-			partitionSize=DECADE;
-		} else if(timeWindow > MONTH) {
-			tableType = "_year";
-			partitionSize=YEAR;
-		} else if(timeWindow > WEEK) {
-			tableType = "_quarter";
-			partitionSize=QUARTER;
-		} else if(timeWindow > DAY) {
-			tableType = "_month";
-			partitionSize=MONTH;			
-		} else {
-			tableType = "_week";
-			partitionSize = WEEK;
-		}
-
-		long currentPartition = now / partitionSize;
-		long startPartition = start / partitionSize;
-		long endPartition = end / partitionSize;
-		while(fallback && partitionSize!=DECADE*100) {
-			// Check if the starting date is in the far distance from current time.  If it is, use down sampled data.
-			if(startPartition + 2 < currentPartition) {
-				fallback=true;
-			    if(partitionSize==DAY) {
-				    tableType = "_month";
-				    partitionSize=MONTH;
-			    } else if(partitionSize==WEEK) {
-				    tableType = "_quarter";
-				    partitionSize=QUARTER;
-			    } else if(partitionSize==MONTH) {
-				    tableType = "_year";
-				    partitionSize=YEAR;
-			    } else if(partitionSize==YEAR) {
-					tableType = "_decade";
-					partitionSize=DECADE;				
-				} else {
-					partitionSize=CENTURY;
-				}
-				currentPartition = now / partitionSize;
-				startPartition = start / partitionSize;
-				endPartition = end / partitionSize;
-			} else {
-				fallback=false;
-			}
-		}
-
-		if(startPartition!=endPartition) {
-			int delta = (int) (endPartition-startPartition);
-			tableNames=new String[delta+1];
-			for(int i=0;i<=delta;i++) {
-				long partition = startPartition+(long)i;
-				tableNames[i]=tableName+"_"+partition+tableType;
-			}
-		} else {
-			tableNames=new String[1];
-			tableNames[0]=tableName+"_"+startPartition+tableType;
-		}
-    	return tableNames;
-    }
-    
-    public static void main(String[] args) {
-    	DatabaseConfig dbc = new DatabaseConfig();
-    	String[] names = dbc.findTableName("system_metrics",1216140020000L,1218645620000L);
-    	for(String n: names) {
-    		System.out.println("name:"+n);
-    	}
+        currentPartition = now / partitionSize;
+        startPartition = start / partitionSize;
+        endPartition = end / partitionSize;
+      } else {
+        fallback = false;
+      }
+    }
+
+    if (startPartition != endPartition) {
+      int delta = (int) (endPartition - startPartition);
+      tableNames = new String[delta + 1];
+      for (int i = 0; i <= delta; i++) {
+        long partition = startPartition + (long) i;
+        tableNames[i] = tableName + "_" + partition + tableType;
+      }
+    } else {
+      tableNames = new String[1];
+      tableNames[0] = tableName + "_" + startPartition + tableType;
+    }
+    return tableNames;
+  }
+
+  public static void main(String[] args) {
+    DatabaseConfig dbc = new DatabaseConfig();
+    String[] names = dbc.findTableName("system_metrics", 1216140020000L,
+        1218645620000L);
+    for (String n : names) {
+      System.out.println("name:" + n);
     }
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/MetricsAggregation.java Wed Mar 11 22:39:26 2009
@@ -1,5 +1,6 @@
 package org.apache.hadoop.chukwa.database;
 
+
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
@@ -10,150 +11,136 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class MetricsAggregation
-{
-	 private static Log log = LogFactory.getLog(MetricsAggregation.class);
-	 private static Connection conn = null;    
-     private static Statement stmt = null; 
-     private static ResultSet rs = null; 
-     private static DatabaseConfig mdlConfig;
-     
-	/**
-	 * @param args
-	 * @throws SQLException 
-	 */
-	public static void main(String[] args) throws SQLException
-	{
-	       mdlConfig = new DatabaseConfig();
-		
-	       // Connect to the database
-	       String jdbc_url = System.getenv("JDBC_URL_PREFIX")+mdlConfig.get("jdbc.host")+"/"+mdlConfig.get("jdbc.db");
-	       if(mdlConfig.get("jdbc.user")!=null) {
-	           jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
-	           if(mdlConfig.get("jdbc.password")!=null) {
-	               jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
-	           }
-	       }
-	       try {
-	           // The newInstance() call is a work around for some
-	           // broken Java implementations
-                   String jdbcDriver = System.getenv("JDBC_DRIVER");
-	           Class.forName(jdbcDriver).newInstance();
-	           log.info("Initialized JDBC URL: "+jdbc_url);
-	       } catch (Exception ex) {
-	           // handle the error
-	    	   ex.printStackTrace();
-	           log.error(ex,ex);
-	       }
-	       try {
-	           conn = DriverManager.getConnection(jdbc_url);
-	       } catch (SQLException ex) 
-	       {
-	    	   ex.printStackTrace();
-	           log.error(ex,ex);
-	       }      
-	       
-	       // get the latest timestamp for aggregation on this table
-		   // Start = latest
-	       
-	      
-	       
-	       SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-	        
-	       long start = System.currentTimeMillis() - (1000*60*60*24);
-	       long end = System.currentTimeMillis() - (1000*60*10);
-	       // retrieve metadata for cluster_system_metrics
-	       DatabaseConfig dbConf = new DatabaseConfig();
-	       String[] tables = dbConf.findTableName("cluster_system_metrics_2018_week", start, end);
-	       for(String table: tables)
-	       {
-	    	   System.out.println("Table to aggregate per Ts: " + table);
-	    	   stmt = conn.createStatement();
-	    	   rs = stmt.executeQuery("select table_ts from aggregation_admin_table where table_name=\"" 
-	    			   + table + "\"");
-			   if (rs.next())
-			   {
-				   start = rs.getLong(1);
-			   }
-			   else
-			   {
-				   start = 0;
-			   }
-			   
-			   end = start + (1000*60*60*1); // do 1 hour aggregation max 
-			   long now = System.currentTimeMillis();
-			   now = now - (1000*60*10); // wait for 10 minutes
-			   end = Math.min(now, end);
-		     
-			   // TODO REMOVE DEBUG ONLY!
-			   end = now;
-			   
-			   System.out.println("Start Date:" + new Date(start));
-			   System.out.println("End Date:" + new Date(end));
-			   
-		       DatabaseMetaData dbm = conn.getMetaData ();
-		       rs = dbm.getColumns ( null,null,table, null);
-		      	
-		       List<String> cols = new ArrayList<String>();
-		       while (rs.next ())
-		       {
-		          	String s = rs.getString (4); // 4 is column name, 5 data type etc. 
-		          	System.out.println ("Name: " + s);
-		          	int type = rs.getInt(5);
-		          	if (type == java.sql.Types.VARCHAR)
-		          	{
-		          		System.out.println("Type: Varchar " + type);
-		          	}
-		          	else
-		          	{
-		          		cols.add(s);
-		          		System.out.println("Type: Number " + type);
-		          	}
-		       }// end of while.
-		       
-		       // build insert into from select query
-		       String initTable = table.replace("cluster_", "");
-		       StringBuilder sb0 = new StringBuilder();
-		       StringBuilder sb = new StringBuilder();
-		       sb0.append("insert into ").append(table).append(" (");
-		       sb.append(" ( select ");
-		       for (int i=0;i<cols.size();i++)
-		       {
-		    	   sb0.append(cols.get(i));
-		    	   sb.append("avg(").append(cols.get(i)).append(") ");
-		    	   if (i< cols.size()-1)
-		    	   {
-		    		   sb0.append(",");
-		    		   sb.append(",");
-		    	   }
-		       }
-			   sb.append(" from ").append(initTable);
-			   sb.append(" where timestamp between \"");
-			   sb.append(formatter.format(start));
-			   sb.append("\" and \"").append(formatter.format(end));
-			   sb.append("\" group by timestamp  )");
-			  
-		        
-			   // close fields
-			   sb0.append(" )").append(sb);
-			   System.out.println(sb0.toString());
-			   
-			   // run query
-			   conn.setAutoCommit(false);
-			   stmt = conn.createStatement();
-			   stmt.execute(sb0.toString());
-			   
-			   // update last run
-			   stmt = conn.createStatement();
-			   stmt.execute("insert into aggregation_admin_table set table_ts=\"" +  formatter.format(end) +
-					   "\" where table_name=\"" + table + "\"");
-			   conn.commit();
-	       }
-	
-	}
+public class MetricsAggregation {
+  private static Log log = LogFactory.getLog(MetricsAggregation.class);
+  private static Connection conn = null;
+  private static Statement stmt = null;
+  private static ResultSet rs = null;
+  private static DatabaseConfig mdlConfig;
+
+  /**
+   * @param args
+   * @throws SQLException
+   */
+  public static void main(String[] args) throws SQLException {
+    mdlConfig = new DatabaseConfig();
+
+    // Connect to the database
+    String jdbc_url = System.getenv("JDBC_URL_PREFIX")
+        + mdlConfig.get("jdbc.host") + "/" + mdlConfig.get("jdbc.db");
+    if (mdlConfig.get("jdbc.user") != null) {
+      jdbc_url = jdbc_url + "?user=" + mdlConfig.get("jdbc.user");
+      if (mdlConfig.get("jdbc.password") != null) {
+        jdbc_url = jdbc_url + "&password=" + mdlConfig.get("jdbc.password");
+      }
+    }
+    try {
+      // The newInstance() call is a work around for some
+      // broken Java implementations
+      String jdbcDriver = System.getenv("JDBC_DRIVER");
+      Class.forName(jdbcDriver).newInstance();
+      log.info("Initialized JDBC URL: " + jdbc_url);
+    } catch (Exception ex) {
+      // handle the error
+      ex.printStackTrace();
+      log.error(ex, ex);
+    }
+    try {
+      conn = DriverManager.getConnection(jdbc_url);
+    } catch (SQLException ex) {
+      ex.printStackTrace();
+      log.error(ex, ex);
+    }
+
+    // get the latest timestamp for aggregation on this table
+    // Start = latest
+
+    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+    long start = System.currentTimeMillis() - (1000 * 60 * 60 * 24);
+    long end = System.currentTimeMillis() - (1000 * 60 * 10);
+    // retrieve metadata for cluster_system_metrics
+    DatabaseConfig dbConf = new DatabaseConfig();
+    String[] tables = dbConf.findTableName("cluster_system_metrics_2018_week",
+        start, end);
+    for (String table : tables) {
+      System.out.println("Table to aggregate per Ts: " + table);
+      stmt = conn.createStatement();
+      rs = stmt
+          .executeQuery("select table_ts from aggregation_admin_table where table_name=\""
+              + table + "\"");
+      if (rs.next()) {
+        start = rs.getLong(1);
+      } else {
+        start = 0;
+      }
+
+      end = start + (1000 * 60 * 60 * 1); // do 1 hour aggregation max
+      long now = System.currentTimeMillis();
+      now = now - (1000 * 60 * 10); // wait for 10 minutes
+      end = Math.min(now, end);
+
+      // TODO REMOVE DEBUG ONLY!
+      end = now;
+
+      System.out.println("Start Date:" + new Date(start));
+      System.out.println("End Date:" + new Date(end));
+
+      DatabaseMetaData dbm = conn.getMetaData();
+      rs = dbm.getColumns(null, null, table, null);
+
+      List<String> cols = new ArrayList<String>();
+      while (rs.next()) {
+        String s = rs.getString(4); // 4 is column name, 5 data type etc.
+        System.out.println("Name: " + s);
+        int type = rs.getInt(5);
+        if (type == java.sql.Types.VARCHAR) {
+          System.out.println("Type: Varchar " + type);
+        } else {
+          cols.add(s);
+          System.out.println("Type: Number " + type);
+        }
+      }// end of while.
+
+      // build insert into from select query
+      String initTable = table.replace("cluster_", "");
+      StringBuilder sb0 = new StringBuilder();
+      StringBuilder sb = new StringBuilder();
+      sb0.append("insert into ").append(table).append(" (");
+      sb.append(" ( select ");
+      for (int i = 0; i < cols.size(); i++) {
+        sb0.append(cols.get(i));
+        sb.append("avg(").append(cols.get(i)).append(") ");
+        if (i < cols.size() - 1) {
+          sb0.append(",");
+          sb.append(",");
+        }
+      }
+      sb.append(" from ").append(initTable);
+      sb.append(" where timestamp between \"");
+      sb.append(formatter.format(start));
+      sb.append("\" and \"").append(formatter.format(end));
+      sb.append("\" group by timestamp  )");
+
+      // close fields
+      sb0.append(" )").append(sb);
+      System.out.println(sb0.toString());
+
+      // run query
+      conn.setAutoCommit(false);
+      stmt = conn.createStatement();
+      stmt.execute(sb0.toString());
+
+      // update last run
+      stmt = conn.createStatement();
+      stmt.execute("insert into aggregation_admin_table set table_ts=\""
+          + formatter.format(end) + "\" where table_name=\"" + table + "\"");
+      conn.commit();
+    }
+
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/TableCreator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/TableCreator.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/TableCreator.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/database/TableCreator.java Wed Mar 11 22:39:26 2009
@@ -15,112 +15,125 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 package org.apache.hadoop.chukwa.database;
 
+
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
 
 public class TableCreator {
-	private static DatabaseConfig dbc = null;
-	private static Log log = LogFactory.getLog(TableCreator.class);		
-	public TableCreator() {
-    	if(dbc==null) {
-    	    dbc = new DatabaseConfig();
-    	}
+  private static DatabaseConfig dbc = null;
+  private static Log log = LogFactory.getLog(TableCreator.class);
+
+  public TableCreator() {
+    if (dbc == null) {
+      dbc = new DatabaseConfig();
     }
-	public void createTables() {
-		long now = (new Date()).getTime();
-        createTables(now,now);		
-	}
-	public void createTables(long start, long end) {
-		String cluster = System.getProperty("CLUSTER");
-		if(cluster==null) {
-			cluster="unknown";
-		}
-		DatabaseWriter dbw = new DatabaseWriter(cluster);
-		try {
-			HashMap<String, String> dbNames = dbc.startWith("report.db.name.");
-			Iterator<String> ki = dbNames.keySet().iterator();
-			while(ki.hasNext()) {
-				String name = ki.next();
-				String tableName = dbNames.get(name);
-				String[] tableList = dbc.findTableName(tableName, start, end);
-				log.debug("table name: "+tableList[0]);
-				try {
-				    String[] parts = tableList[0].split("_");
-				    int partition = Integer.parseInt(parts[parts.length-2]);
-				    String table = "";
-				    for(int i=0;i<parts.length-2;i++) {
-				    	if(i!=0) {
-				    		table=table+"_";
-				    	}
-				    	table=table+parts[i];
-				    }
-				    String query = "show create table "+table+"_template;";
-				    ResultSet rs = dbw.query(query);
-                    while(rs.next()) {				    
-                    	log.debug("table schema: "+rs.getString(2));
-                    	query=rs.getString(2);
-                    	log.debug("template table name:"+table+"_template");
-                    	log.debug("replacing with table name:"+table+"_"+partition+"_"+parts[parts.length-1]);
-                    	log.debug("creating table: "+query);
-                    	String createPartition=query.replaceFirst(table+"_template", table+"_"+partition+"_"+parts[parts.length-1]);
-                    	createPartition=createPartition.replaceFirst("TABLE","TABLE IF NOT EXISTS");
-                    	dbw.execute(createPartition);
-                    	partition++;
-                    	createPartition=query.replaceFirst(table+"_template", table+"_"+partition+"_"+parts[parts.length-1]);
-                    	createPartition=createPartition.replaceFirst("TABLE","TABLE IF NOT EXISTS");
-                    	dbw.execute(createPartition);
-                    	partition++;
-                    	createPartition=query.replaceFirst(table+"_template", table+"_"+partition+"_"+parts[parts.length-1]);
-                    	createPartition=createPartition.replaceFirst("TABLE","TABLE IF NOT EXISTS");
-                    	dbw.execute(createPartition);
-                    }
-				} catch(NumberFormatException e) {
-					log.error("Error in parsing table partition number, skipping table:"+tableList[0]);
-				} catch(ArrayIndexOutOfBoundsException e) {
-					log.debug("Skipping table:"+tableList[0]+", because it has no partition configuration.");
-				} catch(SQLException e) {
-					
-				}
-			}
-		} catch(Exception e) {
-			e.printStackTrace();
-		}		
-	}
-
-	public static void usage() {
-		System.out.println("TableCreator usage:");
-		System.out.println("java -jar chukwa-core.jar org.apache.hadoop.chukwa.TableCreator <date> <time window size>");
-		System.out.println("     date format: YYYY-MM-DD");
-		System.out.println("     time window size: 7, 30, 91, 365, 3650");
-	}
-	
-	public static void main(String[] args) {
-		TableCreator tc = new TableCreator();
-        if(args.length==2) {
-        	try {
-        		SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
-        		long start = sdf.parse(args[0]).getTime();
-        		long end = start + (Long.parseLong(args[1])*1440*60*1000L);
-        		tc.createTables(start, end);
-        	} catch(Exception e) {
-        		System.out.println("Invalid date format or time window size.");
-        		e.printStackTrace();
-				usage();
-        	}
-		} else {
-			tc.createTables();
-		}
+  }
+
+  public void createTables() {
+    long now = (new Date()).getTime();
+    createTables(now, now);
+  }
 
+  public void createTables(long start, long end) {
+    String cluster = System.getProperty("CLUSTER");
+    if (cluster == null) {
+      cluster = "unknown";
     }
+    DatabaseWriter dbw = new DatabaseWriter(cluster);
+    try {
+      HashMap<String, String> dbNames = dbc.startWith("report.db.name.");
+      Iterator<String> ki = dbNames.keySet().iterator();
+      while (ki.hasNext()) {
+        String name = ki.next();
+        String tableName = dbNames.get(name);
+        String[] tableList = dbc.findTableName(tableName, start, end);
+        log.debug("table name: " + tableList[0]);
+        try {
+          String[] parts = tableList[0].split("_");
+          int partition = Integer.parseInt(parts[parts.length - 2]);
+          String table = "";
+          for (int i = 0; i < parts.length - 2; i++) {
+            if (i != 0) {
+              table = table + "_";
+            }
+            table = table + parts[i];
+          }
+          String query = "show create table " + table + "_template;";
+          ResultSet rs = dbw.query(query);
+          while (rs.next()) {
+            log.debug("table schema: " + rs.getString(2));
+            query = rs.getString(2);
+            log.debug("template table name:" + table + "_template");
+            log.debug("replacing with table name:" + table + "_" + partition
+                + "_" + parts[parts.length - 1]);
+            log.debug("creating table: " + query);
+            String createPartition = query.replaceFirst(table + "_template",
+                table + "_" + partition + "_" + parts[parts.length - 1]);
+            createPartition = createPartition.replaceFirst("TABLE",
+                "TABLE IF NOT EXISTS");
+            dbw.execute(createPartition);
+            partition++;
+            createPartition = query.replaceFirst(table + "_template", table
+                + "_" + partition + "_" + parts[parts.length - 1]);
+            createPartition = createPartition.replaceFirst("TABLE",
+                "TABLE IF NOT EXISTS");
+            dbw.execute(createPartition);
+            partition++;
+            createPartition = query.replaceFirst(table + "_template", table
+                + "_" + partition + "_" + parts[parts.length - 1]);
+            createPartition = createPartition.replaceFirst("TABLE",
+                "TABLE IF NOT EXISTS");
+            dbw.execute(createPartition);
+          }
+        } catch (NumberFormatException e) {
+          log.error("Error in parsing table partition number, skipping table:"
+              + tableList[0]);
+        } catch (ArrayIndexOutOfBoundsException e) {
+          log.debug("Skipping table:" + tableList[0]
+              + ", because it has no partition configuration.");
+        } catch (SQLException e) {
+
+        }
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  public static void usage() {
+    System.out.println("TableCreator usage:");
+    System.out
+        .println("java -jar chukwa-core.jar org.apache.hadoop.chukwa.TableCreator <date> <time window size>");
+    System.out.println("     date format: YYYY-MM-DD");
+    System.out.println("     time window size: 7, 30, 91, 365, 3650");
+  }
+
+  public static void main(String[] args) {
+    TableCreator tc = new TableCreator();
+    if (args.length == 2) {
+      try {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+        long start = sdf.parse(args[0]).getTime();
+        long end = start + (Long.parseLong(args[1]) * 1440 * 60 * 1000L);
+        tc.createTables(start, end);
+      } catch (Exception e) {
+        System.out.println("Invalid date format or time window size.");
+        e.printStackTrace();
+        usage();
+      }
+    } else {
+      tc.createTables();
+    }
+
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkQueue.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.chukwa.datacollection;
 
-import java.util.List;
 
+import java.util.List;
 import org.apache.hadoop.chukwa.Chunk;
 
 /**
@@ -27,24 +27,25 @@
  * 
  * Differs from a normal queue interface primarily by having collect().
  */
-public interface ChunkQueue extends ChunkReceiver
-{
+public interface ChunkQueue extends ChunkReceiver {
   /**
-   *  Add a chunk to the queue, blocking if queue is full.
+   * Add a chunk to the queue, blocking if queue is full.
+   * 
    * @param chunk
    * @throws InterruptedException if thread is interrupted while blocking
    */
-	public void add(Chunk chunk) throws InterruptedException;
-	
-	/**
-	 * Return at least one, and no more than count, Chunks into chunks.
-	 * Blocks if queue is empty.
-	 */
-	public void collect(List<Chunk> chunks,int count) throws InterruptedException;
-	
-	/**
-	 * Return an approximation of the number of chunks in the queue currently.
-	 * No guarantees are made about the accuracy of this number. 
-	 */
-	public int size();
+  public void add(Chunk chunk) throws InterruptedException;
+
+  /**
+   * Return at least one, and no more than count, Chunks into chunks. Blocks if
+   * queue is empty.
+   */
+  public void collect(List<Chunk> chunks, int count)
+      throws InterruptedException;
+
+  /**
+   * Return an approximation of the number of chunks in the queue currently. No
+   * guarantees are made about the accuracy of this number.
+   */
+  public int size();
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/ChunkReceiver.java Wed Mar 11 22:39:26 2009
@@ -1,15 +1,16 @@
 package org.apache.hadoop.chukwa.datacollection;
 
+
 import org.apache.hadoop.chukwa.Chunk;
 
 public interface ChunkReceiver {
-  
+
   /**
-   *  Add a chunk to the queue, potentially blocking.
+   * Add a chunk to the queue, potentially blocking.
+   * 
    * @param event
    * @throws InterruptedException if thread is interrupted while blocking
    */
   public void add(Chunk event) throws java.lang.InterruptedException;
-  
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java Wed Mar 11 22:39:26 2009
@@ -18,70 +18,67 @@
 
 package org.apache.hadoop.chukwa.datacollection;
 
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
-
 import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
-
 import org.apache.hadoop.chukwa.datacollection.agent.*;
 import org.apache.log4j.Logger;
 
-public class DataFactory
-{
+public class DataFactory {
   static Logger log = Logger.getLogger(DataFactory.class);
-	static final int QUEUE_SIZE_KB = 10 * 1024;
-	static final String COLLECTORS_FILENAME = "collectors";
-	private static DataFactory dataFactory = null;
-	private ChunkQueue chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
-
-	static 
-	{
-		dataFactory = new DataFactory();
-	}
-
-	private DataFactory()
-	{}
-
-	public static DataFactory getInstance() {
-		return dataFactory;
-	}
-
-	public ChunkQueue getEventQueue() {
-		return chunkQueue;
-	}
-
-	/**
-	 * @return empty list if file does not exist
-	 * @throws IOException on other error
-	 */
-	public Iterator<String> getCollectorURLs() throws IOException
-	{
-		  String chukwaHome = System.getenv("CHUKWA_HOME");
-		  if (chukwaHome == null){
-			  chukwaHome = ".";
-		  }
-
-		  if(!chukwaHome.endsWith("/"))
-		  {  chukwaHome = chukwaHome + File.separator; }
-		  log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]" );
-
-		  String chukwaConf = System.getenv("CHUKWA_CONF_DIR");    
-		  if (chukwaConf == null)
-		  {
-			  chukwaConf = chukwaHome + "conf" + File.separator;
-		  }
-
-		  log.info("Config - System.getenv(\"chukwaConf\"): [" + chukwaConf + "]" );
-		  
-	  log.info("setting up collectors file: " + chukwaConf + 
-	      File.separator + COLLECTORS_FILENAME);
-		File collectors = new File(chukwaConf + File.separator + "collectors");
-		try{
-		  return new RetryListOfCollectors(collectors, 1000 * 15);//time is ms between tries
-		} catch(java.io.IOException e) {
-			log.error("failed to read collectors file: ", e);
-			throw e;
-		}
-	}
+  static final int QUEUE_SIZE_KB = 10 * 1024;
+  static final String COLLECTORS_FILENAME = "collectors";
+  private static DataFactory dataFactory = null;
+  private ChunkQueue chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
+
+  static {
+    dataFactory = new DataFactory();
+  }
+
+  private DataFactory() {
+  }
+
+  public static DataFactory getInstance() {
+    return dataFactory;
+  }
+
+  public ChunkQueue getEventQueue() {
+    return chunkQueue;
+  }
+
+  /**
+   * @return empty list if file does not exist
+   * @throws IOException on other error
+   */
+  public Iterator<String> getCollectorURLs() throws IOException {
+    String chukwaHome = System.getenv("CHUKWA_HOME");
+    if (chukwaHome == null) {
+      chukwaHome = ".";
+    }
+
+    if (!chukwaHome.endsWith("/")) {
+      chukwaHome = chukwaHome + File.separator;
+    }
+    log.info("Config - System.getenv(\"CHUKWA_HOME\"): [" + chukwaHome + "]");
+
+    String chukwaConf = System.getenv("CHUKWA_CONF_DIR");
+    if (chukwaConf == null) {
+      chukwaConf = chukwaHome + "conf" + File.separator;
+    }
+
+    log.info("Config - System.getenv(\"chukwaConf\"): [" + chukwaConf + "]");
+
+    log.info("setting up collectors file: " + chukwaConf + File.separator
+        + COLLECTORS_FILENAME);
+    File collectors = new File(chukwaConf + File.separator + "collectors");
+    try {
+      return new RetryListOfCollectors(collectors, 1000 * 15);// time is ms
+                                                              // between tries
+    } catch (java.io.IOException e) {
+      log.error("failed to read collectors file: ", e);
+      throw e;
+    }
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Wed Mar 11 22:39:26 2009
@@ -17,76 +17,82 @@
  */
 
 package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 
 /**
- * An adaptor is a component that runs within the Local Agent, producing 
- * chunks of monitoring data.
+ * An adaptor is a component that runs within the Local Agent, producing chunks
+ * of monitoring data.
  * 
- * An adaptor can, but need not, have an associated thread. If an adaptor
- * lacks a thread, it needs to arrange some mechanism to periodically get control
- * and send reports such as a callback somewhere.
+ * An adaptor can, but need not, have an associated thread. If an adaptor lacks
+ * a thread, it needs to arrange some mechanism to periodically get control and
+ * send reports such as a callback somewhere.
  * 
- * Adaptors must be able to stop and resume without losing data, using
- * a byte offset in the stream.
+ * Adaptors must be able to stop and resume without losing data, using a byte
+ * offset in the stream.
  * 
  * If an adaptor crashes at byte offset n, and is restarted at byte offset k,
- * with k < n, it is allowed to send different values for bytes k through n the 
- * second time around.  However, the stream must still be parseable, assuming that
- * bytes 0-k come from the first run,and bytes k - n come from the second.
+ * with k < n, it is allowed to send different values for bytes k through n the
+ * second time around. However, the stream must still be parseable, assuming
+ * that bytes 0-k come from the first run,and bytes k - n come from the second.
  */
-public interface Adaptor
-{
+public interface Adaptor {
   /**
    * Start this adaptor
+   * 
    * @param type the application type, who is starting this adaptor
    * @param status the status string to use for configuration.
    * @param offset the stream offset of the first byte sent by this adaptor
    * @throws AdaptorException
    */
-	public void start(long adaptorID, String type, String status, long offset, ChunkReceiver dest) throws AdaptorException;
-	
-	/**
-	 * Return the adaptor's state
-	 * Should not include class name, datatype or byte offset, which are written by caller.
-	 * @return the adaptor state as a string
-	 * @throws AdaptorException
-	 */
-	public String getCurrentStatus() throws AdaptorException;
-	public String getType();
-	
-	/**
-	 * Return the stream name
-	 * @return Stream name as a string
-	 */
-	public String getStreamName();
-	/**
-	 * Signals this adaptor to come to an orderly stop.
-	 * The adaptor ought to push out all the data it can
-	 * before exiting.
-	 * 
-	 * This method is synchronous:
-	 * In other words, after shutdown() returns, no new data should be written.
-	 * 
-	 * @return the logical offset at which the adaptor stops
-	 * @throws AdaptorException
-	 */
-	public long shutdown() throws AdaptorException;
-	
-	/**
-	 * Signals this adaptor to come to an abrupt stop, as quickly as it can.
-	 * The use case here is "Whups, I didn't mean to start that adaptor tailing
-	 * a gigabyte file, stop it now".
-	 * 
-	 * Adaptors might need to do something nontrivial here, e.g., in the case in which  
-	 * they have registered periodic timer interrupts, or use a shared worker thread
-	 * from which they need to disengage.
-	 * 
-	 * This method is synchronous:
-   * In other words, after shutdown() returns, no new data should be written.
-   *
-	 * @throws AdaptorException
-	 */
-	public void hardStop() throws AdaptorException;
+  public void start(long adaptorID, String type, String status, long offset,
+      ChunkReceiver dest) throws AdaptorException;
+
+  /**
+   * Return the adaptor's state Should not include class name, datatype or byte
+   * offset, which are written by caller.
+   * 
+   * @return the adaptor state as a string
+   * @throws AdaptorException
+   */
+  public String getCurrentStatus() throws AdaptorException;
+
+  public String getType();
+
+  /**
+   * Return the stream name
+   * 
+   * @return Stream name as a string
+   */
+  public String getStreamName();
+
+  /**
+   * Signals this adaptor to come to an orderly stop. The adaptor ought to push
+   * out all the data it can before exiting.
+   * 
+   * This method is synchronous: In other words, after shutdown() returns, no
+   * new data should be written.
+   * 
+   * @return the logical offset at which the adaptor stops
+   * @throws AdaptorException
+   */
+  public long shutdown() throws AdaptorException;
+
+  /**
+   * Signals this adaptor to come to an abrupt stop, as quickly as it can. The
+   * use case here is "Whups, I didn't mean to start that adaptor tailing a
+   * gigabyte file, stop it now".
+   * 
+   * Adaptors might need to do something nontrivial here, e.g., in the case in
+   * which they have registered periodic timer interrupts, or use a shared
+   * worker thread from which they need to disengage.
+   * 
+   * This method is synchronous: In other words, after shutdown() returns, no
+   * new data should be written.
+   * 
+   * @throws AdaptorException
+   */
+  public void hardStop() throws AdaptorException;
 
 }
\ No newline at end of file

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AdaptorException.java Wed Mar 11 22:39:26 2009
@@ -18,29 +18,25 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
-public class AdaptorException extends Exception
-{
 
-	private static final long serialVersionUID = -8490279345367308690L;
+public class AdaptorException extends Exception {
 
-	public AdaptorException()
-	{
-		super();
-	}
-
-	public AdaptorException(String arg0, Throwable arg1)
-	{
-		super(arg0, arg1);
-	}
-
-	public AdaptorException(String arg0)
-	{
-		super(arg0);
-	}
-
-	public AdaptorException(Throwable arg0)
-	{
-		super(arg0);
-	}
+  private static final long serialVersionUID = -8490279345367308690L;
+
+  public AdaptorException() {
+    super();
+  }
+
+  public AdaptorException(String arg0, Throwable arg1) {
+    super(arg0, arg1);
+  }
+
+  public AdaptorException(String arg0) {
+    super(arg0);
+  }
+
+  public AdaptorException(Throwable arg0) {
+    super(arg0);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ExecAdaptor.java Wed Mar 11 22:39:26 2009
@@ -17,6 +17,8 @@
  */
 
 package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
@@ -27,14 +29,14 @@
 import java.util.*;
 
 /**
- * Runs a command inside chukwa.  Takes as params the interval 
- * in seconds at which to run the command, and the path and args
- * to execute.
+ * Runs a command inside chukwa. Takes as params the interval in seconds at
+ * which to run the command, and the path and args to execute.
  * 
  * Interval is optional, and defaults to 5 seconds.
  * 
- * Example usage:  
- * add org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor Ps 2 /bin/ps aux 0
+ * Example usage: add
+ * org.apache.hadoop.chukwa.datacollection.adaptor.ExecAdaptor Ps 2 /bin/ps aux
+ * 0
  * 
  */
 public class ExecAdaptor extends ExecPlugin implements Adaptor {
@@ -42,77 +44,78 @@
   static final boolean FAKE_LOG4J_HEADER = true;
   static final boolean SPLIT_LINES = false;
   protected long adaptorID = 0;
-  static Logger log =Logger.getLogger(ExecAdaptor.class);
-   
+  static Logger log = Logger.getLogger(ExecAdaptor.class);
+
   class RunToolTask extends TimerTask {
     public void run() {
       JSONObject o = execute();
       try {
-        
-        if(o.getInt("status") == statusKO)
+
+        if (o.getInt("status") == statusKO)
           hardStop();
-        
-         //FIXME: downstream customers would like timestamps here.
-         //Doing that efficiently probably means cutting out all the
-         //excess buffer copies here, and appending into an OutputBuffer. 
+
+        // FIXME: downstream customers would like timestamps here.
+        // Doing that efficiently probably means cutting out all the
+        // excess buffer copies here, and appending into an OutputBuffer.
         byte[] data;
-        if(FAKE_LOG4J_HEADER) {
+        if (FAKE_LOG4J_HEADER) {
           StringBuilder result = new StringBuilder();
           ISO8601DateFormat dateFormat = new org.apache.log4j.helpers.ISO8601DateFormat();
           result.append(dateFormat.format(new java.util.Date()));
           result.append(" INFO org.apache.hadoop.chukwa.");
           result.append(type);
-          result.append(": ");  
+          result.append(": ");
           result.append(o.getString("stdout"));
           data = result.toString().getBytes();
         } else {
           String stdout = o.getString("stdout");
           data = stdout.getBytes();
         }
- 
+
         sendOffset += data.length;
-        ChunkImpl c = new ChunkImpl(ExecAdaptor.this.type,
-            "results from " + cmd, sendOffset , data, ExecAdaptor.this);
-        
-        if(SPLIT_LINES) {
-          ArrayList<Integer> carriageReturns = new  ArrayList<Integer>();
-          for(int i = 0; i < data.length ; ++i)
-            if(data[i] == '\n')
+        ChunkImpl c = new ChunkImpl(ExecAdaptor.this.type, "results from "
+            + cmd, sendOffset, data, ExecAdaptor.this);
+
+        if (SPLIT_LINES) {
+          ArrayList<Integer> carriageReturns = new ArrayList<Integer>();
+          for (int i = 0; i < data.length; ++i)
+            if (data[i] == '\n')
               carriageReturns.add(i);
-          
+
           c.setRecordOffsets(carriageReturns);
-        }  //else we get default one record
-        
+        } // else we get default one record
+
         dest.add(c);
-      } catch(JSONException e ) {
-        //FIXME: log this somewhere
-      } catch (InterruptedException e)  {
+      } catch (JSONException e) {
+        // FIXME: log this somewhere
+      } catch (InterruptedException e) {
         // TODO Auto-generated catch block
-      }catch(AdaptorException e ) {
-        //FIXME: log this somewhere
+      } catch (AdaptorException e) {
+        // FIXME: log this somewhere
       }
     }
   };
-  
+
   String cmd;
   String type;
   ChunkReceiver dest;
   final java.util.Timer timer;
   long period = 5 * 1000;
   volatile long sendOffset = 0;
-  
+
   public ExecAdaptor() {
     timer = new java.util.Timer();
   }
-  
+
   @Override
   public String getCurrentStatus() throws AdaptorException {
     return type + " " + period + " " + cmd + " " + sendOffset;
   }
 
   public String getStreamName() {
-	  return cmd;
+    return cmd;
   }
+
   @Override
   public void hardStop() throws AdaptorException {
     super.stop();
@@ -120,37 +123,37 @@
   }
 
   @Override
-  public long shutdown() throws AdaptorException   { 
+  public long shutdown() throws AdaptorException {
     try {
       timer.cancel();
-      super.waitFor(); //wait for last data to get pushed out
-    } catch(InterruptedException e) {
-     return sendOffset; 
+      super.waitFor(); // wait for last data to get pushed out
+    } catch (InterruptedException e) {
+      return sendOffset;
     }
     return sendOffset;
   }
 
   @Override
-  public void start(long adaptorID, String type, String status, long offset, ChunkReceiver dest)
-      throws AdaptorException  {
-    
+  public void start(long adaptorID, String type, String status, long offset,
+      ChunkReceiver dest) throws AdaptorException {
+
     int spOffset = status.indexOf(' ');
-    if(spOffset > 0) {
-    try {
-      period = Integer.parseInt(status.substring(0, spOffset));
-      cmd = status.substring(spOffset + 1);
-    } catch(NumberFormatException e) {
-      log.warn("ExecAdaptor: sample interval " + status.substring(0, spOffset) + " can't be parsed");
-      cmd = status;
+    if (spOffset > 0) {
+      try {
+        period = Integer.parseInt(status.substring(0, spOffset));
+        cmd = status.substring(spOffset + 1);
+      } catch (NumberFormatException e) {
+        log.warn("ExecAdaptor: sample interval "
+            + status.substring(0, spOffset) + " can't be parsed");
+        cmd = status;
       }
-   }
-    else
+    } else
       cmd = status;
     this.adaptorID = adaptorID;
     this.type = type;
     this.dest = dest;
     this.sendOffset = offset;
-    
+
     TimerTask exec = new RunToolTask();
     timer.schedule(exec, 0L, period);
   }
@@ -159,7 +162,6 @@
   public String getCmde() {
     return cmd;
   }
-  
 
   @Override
   public String getType() {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java Wed Mar 11 22:39:26 2009
@@ -18,114 +18,118 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor;
 
+
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
-import org.apache.hadoop.chukwa.util.RecordConstants; 
+import org.apache.hadoop.chukwa.util.RecordConstants;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 
-
 /**
  * File Adaptor push small size file in one chunk to collector
  */
-public class FileAdaptor  implements Adaptor
-{
+public class FileAdaptor implements Adaptor {
+
+  static Logger log;
 
-	static Logger log;
+  protected static Configuration conf = null;
+  private int attempts = 0;
+
+  File toWatch;
+  /**
+   * next PHYSICAL offset to read
+   */
+  protected long fileReadOffset;
+  protected String type;
+  private ChunkReceiver dest;
+  protected RandomAccessFile reader = null;
+  protected long adaptorID;
+
+  /**
+   * The logical offset of the first byte of the file
+   */
+  private long offsetOfFirstByte = 0;
+
+  static {
+    log = Logger.getLogger(FileAdaptor.class);
+  }
+
+  public void start(long adaptorID, String type, String params, long bytes,
+      ChunkReceiver dest) {
+    // in this case params = filename
+    log.info("adaptor id: " + adaptorID + " started file adaptor on file "
+        + params);
+    this.adaptorID = adaptorID;
+    this.type = type;
+    this.dest = dest;
+    this.attempts = 0;
+
+    String[] words = params.split(" ");
+    if (words.length > 1) {
+      offsetOfFirstByte = Long.parseLong(words[0]);
+      toWatch = new File(params.substring(words[0].length() + 1));
+    } else {
+      toWatch = new File(params);
+    }
+    try {
+      reader = new RandomAccessFile(toWatch, "r");
+      long bufSize = toWatch.length();
+      byte[] buf = new byte[(int) bufSize];
+      reader.read(buf);
+      long fileTime = toWatch.lastModified();
+      int bytesUsed = extractRecords(dest, 0, buf, fileTime);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    ChukwaAgent agent = ChukwaAgent.getAgent();
+    if (agent != null) {
+      agent.stopAdaptor(adaptorID, false);
+    } else {
+      log.info("Agent is null, running in default mode");
+    }
+    this.fileReadOffset = bytes;
+  }
+
+  /**
+   * Do one last tail, and then stop
+   * 
+   * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
+   */
+  public long shutdown() throws AdaptorException {
+    hardStop();
+    return fileReadOffset + offsetOfFirstByte;
+  }
+
+  /**
+   * Stop tailing the file, effective immediately.
+   */
+  public void hardStop() throws AdaptorException {
+  }
+
+  public String getStreamName() {
+    return toWatch.getPath();
+  }
 
-	protected static Configuration conf = null;
-	private int attempts = 0;
-	
-	File toWatch;
-	/**
-	 * next PHYSICAL offset to read
-	 */
-	protected long fileReadOffset;
-	protected String type;
-	private ChunkReceiver dest;
-	protected RandomAccessFile reader = null;
-	protected long adaptorID;
-	
-	/**
-	 * The logical offset of the first byte of the file
-	 */
-	private long offsetOfFirstByte = 0;
-	
-	static {
-		log =Logger.getLogger(FileAdaptor.class);
-	}
-
-	public void start(long adaptorID, String type, String params, long bytes, ChunkReceiver dest) {
-	    //in this case params = filename 
-		log.info("adaptor id: "+adaptorID+" started file adaptor on file " + params);
-		this.adaptorID = adaptorID;
-	    this.type = type;
-	    this.dest = dest;
-	    this.attempts = 0;
-			  
-	    String[] words = params.split(" ");
-	    if(words.length > 1) {
-	        offsetOfFirstByte = Long.parseLong(words[0]);
-	        toWatch = new File(params.substring(words[0].length() + 1));
-	    } else {
-	        toWatch = new File(params);
-	    }
-	    try {
-	  		reader = new RandomAccessFile(toWatch, "r");
-	  		long bufSize = toWatch.length();
-			byte[] buf = new byte[(int) bufSize];
-			reader.read(buf);
-	        long fileTime = toWatch.lastModified();
-			int bytesUsed = extractRecords(dest, 0, buf, fileTime);
-	    } catch(Exception e) {
-	        e.printStackTrace();
-	    }
-		ChukwaAgent agent = ChukwaAgent.getAgent();
-		if (agent != null) {
-			agent.stopAdaptor(adaptorID, false);
-		} else {
-			log.info("Agent is null, running in default mode");
-		}
-		this.fileReadOffset= bytes;
-	}
-
-	/**
-	 * Do one last tail, and then stop
-	 * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
-	 */
-	public long shutdown() throws AdaptorException {
-	  hardStop();
-	  return fileReadOffset + offsetOfFirstByte;
-	}
-	/**
-	 * Stop tailing the file, effective immediately.
-	 */
-	public void hardStop() throws AdaptorException {
-	}
-
-	public String getStreamName() {
-		return toWatch.getPath();
-	}
-	
   /**
    * Extract records from a byte sequence
+   * 
    * @param eq the queue to stick the new chunk[s] in
    * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
    * @param buf the byte buffer to extract records from
    * @return the number of bytes processed
    * @throws InterruptedException
    */
-  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf, long fileTime) throws InterruptedException {
-    ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(), buffOffsetInFile + buf.length,
-        buf, this);
+  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+      byte[] buf, long fileTime) throws InterruptedException {
+    ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
+        buffOffsetInFile + buf.length, buf, this);
     String tags = chunk.getTags();
-    chunk.setTags(tags+" time=\""+fileTime+"\"");
+    chunk.setTags(tags + " time=\"" + fileTime + "\"");
     eq.add(chunk);
     return buf.length;
   }
@@ -137,7 +141,8 @@
 
   @Override
   public String getCurrentStatus() throws AdaptorException {
-    return type.trim() + " " + offsetOfFirstByte+ " " + toWatch.getPath() + " " + fileReadOffset;
+    return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath()
+        + " " + fileReadOffset;
   }
-  
+
 }
\ No newline at end of file