You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [6/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/database/MetricDataLoader.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.extraction.database;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -31,7 +32,6 @@
 import java.util.Iterator;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
@@ -46,316 +46,336 @@
 import org.apache.hadoop.io.SequenceFile;
 
 public class MetricDataLoader {
-     private static Log log = LogFactory.getLog(MetricDataLoader.class);     
-	 private static Connection conn = null;    
-     private static Statement stmt = null; 
-     private ResultSet rs = null; 
-     private static DatabaseConfig mdlConfig = null;
-     private static HashMap<String, String> normalize = null;
-     private static HashMap<String, String> transformer = null;
-     private static HashMap<String, Float> conversion = null;
-     private static HashMap<String, String> dbTables = null;
-     private HashMap<String, HashMap<String,Integer>> dbSchema = null;
-     private static String newSpace="-";
-     private static boolean batchMode = true;
-
-     /** Creates a new instance of DBWriter */
-    public MetricDataLoader() {        
-        initEnv("");
+  private static Log log = LogFactory.getLog(MetricDataLoader.class);
+  private static Connection conn = null;
+  private static Statement stmt = null;
+  private ResultSet rs = null;
+  private static DatabaseConfig mdlConfig = null;
+  private static HashMap<String, String> normalize = null;
+  private static HashMap<String, String> transformer = null;
+  private static HashMap<String, Float> conversion = null;
+  private static HashMap<String, String> dbTables = null;
+  private HashMap<String, HashMap<String, Integer>> dbSchema = null;
+  private static String newSpace = "-";
+  private static boolean batchMode = true;
+
+  /** Creates a new instance of DBWriter */
+  public MetricDataLoader() {
+    initEnv("");
+  }
+
+  public MetricDataLoader(String cluster) {
+    initEnv(cluster);
+  }
+
+  private void initEnv(String cluster) {
+    mdlConfig = new DatabaseConfig();
+    transformer = mdlConfig.startWith("metric.");
+    conversion = new HashMap<String, Float>();
+    normalize = mdlConfig.startWith("normalize.");
+    dbTables = mdlConfig.startWith("report.db.name.");
+    Iterator<?> entries = mdlConfig.iterator();
+    while (entries.hasNext()) {
+      String entry = entries.next().toString();
+      if (entry.startsWith("conversion.")) {
+        String[] metrics = entry.split("=");
+        try {
+          float convertNumber = Float.parseFloat(metrics[1]);
+          conversion.put(metrics[0], convertNumber);
+        } catch (NumberFormatException ex) {
+          log.error(metrics[0] + " is not a number.");
+        }
+      }
     }
-    
-    public MetricDataLoader(String cluster) {
-        initEnv(cluster);    	
+    String jdbc_url = "";
+    log.debug("cluster name:" + cluster);
+    if (!cluster.equals("")) {
+      ClusterConfig cc = new ClusterConfig();
+      jdbc_url = cc.getURL(cluster);
     }
-    
-    private void initEnv(String cluster){
-       mdlConfig = new DatabaseConfig();
-       transformer = mdlConfig.startWith("metric.");
-       conversion = new HashMap<String, Float>();
-       normalize = mdlConfig.startWith("normalize.");
-       dbTables = mdlConfig.startWith("report.db.name.");
-       Iterator<?> entries = mdlConfig.iterator();
-       while(entries.hasNext()) {
-           String entry = entries.next().toString();
-           if(entry.startsWith("conversion.")) {
-               String[] metrics = entry.split("=");
-               try {
-            	   float convertNumber = Float.parseFloat(metrics[1]);
-                   conversion.put(metrics[0],convertNumber);               
-               } catch (NumberFormatException ex) {
-                   log.error(metrics[0]+" is not a number.");
-               }
-           }
-       }
-       String jdbc_url = "";
-       log.debug("cluster name:"+cluster);
-       if(!cluster.equals("")) {
-    	   ClusterConfig cc = new ClusterConfig();
-    	   jdbc_url = cc.getURL(cluster);
-       }
-       try {
-           // The newInstance() call is a work around for some
-           // broken Java implementations
-           String jdbcDriver = System.getenv("JDBC_DRIVER");
-           Class.forName(jdbcDriver).newInstance();
-           log.debug("Initialized JDBC URL: "+jdbc_url);
-       } catch (Exception ex) {
-           // handle the error
-           log.error(ex,ex);
-       }
-       try {
-           conn = DriverManager.getConnection(jdbc_url);
-           HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
-           Iterator<String> ki = dbNames.keySet().iterator();
-           dbSchema = new HashMap<String, HashMap<String,Integer>>();
-    	   DatabaseWriter dbWriter = new DatabaseWriter(cluster);
-           while(ki.hasNext()) {
-        	   String table = dbNames.get(ki.next().toString());
-        	   String query = "select * from "+table+"_template limit 1";
-        	   try {
-	        	   ResultSet rs = dbWriter.query(query);
-	        	   ResultSetMetaData rmeta = rs.getMetaData();
-	        	   HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
-	        	   for(int i=1;i<=rmeta.getColumnCount();i++) {
-	        		   tableSchema.put(rmeta.getColumnName(i),rmeta.getColumnType(i));
-	        	   }
-	               dbSchema.put(table, tableSchema); 
-        	   } catch(SQLException ex) {
-        		   log.debug("table: "+table+" template does not exist, MDL will not load data for this table.");
-        	   }
-           }
-           dbWriter.close();
-       } catch (SQLException ex) {
-           log.error(ex,ex);
-       }       
+    try {
+      // The newInstance() call is a work around for some
+      // broken Java implementations
+      String jdbcDriver = System.getenv("JDBC_DRIVER");
+      Class.forName(jdbcDriver).newInstance();
+      log.debug("Initialized JDBC URL: " + jdbc_url);
+    } catch (Exception ex) {
+      // handle the error
+      log.error(ex, ex);
     }
-    
-    public void interrupt() {
+    try {
+      conn = DriverManager.getConnection(jdbc_url);
+      HashMap<String, String> dbNames = mdlConfig.startWith("report.db.name.");
+      Iterator<String> ki = dbNames.keySet().iterator();
+      dbSchema = new HashMap<String, HashMap<String, Integer>>();
+      DatabaseWriter dbWriter = new DatabaseWriter(cluster);
+      while (ki.hasNext()) {
+        String table = dbNames.get(ki.next().toString());
+        String query = "select * from " + table + "_template limit 1";
+        try {
+          ResultSet rs = dbWriter.query(query);
+          ResultSetMetaData rmeta = rs.getMetaData();
+          HashMap<String, Integer> tableSchema = new HashMap<String, Integer>();
+          for (int i = 1; i <= rmeta.getColumnCount(); i++) {
+            tableSchema.put(rmeta.getColumnName(i), rmeta.getColumnType(i));
+          }
+          dbSchema.put(table, tableSchema);
+        } catch (SQLException ex) {
+          log
+              .debug("table: "
+                  + table
+                  + " template does not exist, MDL will not load data for this table.");
+        }
+      }
+      dbWriter.close();
+    } catch (SQLException ex) {
+      log.error(ex, ex);
     }
-    
-    private String escape(String s,String c){
-        
-        String ns = s.trim();
-        Pattern pattern=Pattern.compile(" +");
-        Matcher matcher = pattern.matcher(ns);
-        String s2= matcher.replaceAll(c);
+  }
 
-        return s2;
+  public void interrupt() {
+  }
 
-      
-    }
-    
-    public void process(Path source)  throws IOException, URISyntaxException, SQLException {
-
-        System.out.println("Input file:" + source.getName());
+  private String escape(String s, String c) {
 
-        ChukwaConfiguration conf = new ChukwaConfiguration();
-        String fsName = conf.get("writer.hdfs.filesystem");
-        FileSystem fs = FileSystem.get(new URI(fsName), conf);
-
-        SequenceFile.Reader r = 
-			new SequenceFile.Reader(fs,source, conf);
-
-        stmt = conn.createStatement(); 
-        conn.setAutoCommit(false);
-        
-        ChukwaRecordKey key = new ChukwaRecordKey();
-        ChukwaRecord record = new ChukwaRecord();
+    String ns = s.trim();
+    Pattern pattern = Pattern.compile(" +");
+    Matcher matcher = pattern.matcher(ns);
+    String s2 = matcher.replaceAll(c);
+
+    return s2;
+
+  }
+
+  public void process(Path source) throws IOException, URISyntaxException,
+      SQLException {
+
+    System.out.println("Input file:" + source.getName());
+
+    ChukwaConfiguration conf = new ChukwaConfiguration();
+    String fsName = conf.get("writer.hdfs.filesystem");
+    FileSystem fs = FileSystem.get(new URI(fsName), conf);
+
+    SequenceFile.Reader r = new SequenceFile.Reader(fs, source, conf);
+
+    stmt = conn.createStatement();
+    conn.setAutoCommit(false);
+
+    ChukwaRecordKey key = new ChukwaRecordKey();
+    ChukwaRecord record = new ChukwaRecord();
+    try {
+      int batch = 0;
+      while (r.next(key, record)) {
+        boolean isSuccessful = true;
+        String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
+        log.debug("Timestamp: " + record.getTime());
+        log.debug("DataType: " + key.getReduceType());
+        log.debug("StreamName: " + source.getName());
+
+        String[] fields = record.getFields();
+        String table = null;
+        String[] priKeys = null;
+        HashMap<String, HashMap<String, String>> hashReport = new HashMap<String, HashMap<String, String>>();
+        String normKey = new String();
+        String node = record.getValue("csource");
+        String recordType = key.getReduceType().toLowerCase();
+        if (dbTables.containsKey("report.db.name." + recordType)) {
+
+          String[] tmp = mdlConfig.findTableName(mdlConfig
+              .get("report.db.name." + recordType), record.getTime(), record
+              .getTime());
+          table = tmp[0];
+        } else {
+          log.debug("report.db.name." + recordType + " does not exist.");
+          continue;
+        }
+        log.debug("table name:" + table);
         try {
-            int batch=0;
-            while (r.next(key, record)) {
-                    boolean isSuccessful=true;
-                    String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());
-                    log.debug("Timestamp: " + record.getTime());
-                    log.debug("DataType: " + key.getReduceType());
-                    log.debug("StreamName: " + source.getName());
-		
-                    String[] fields = record.getFields();
-	            String table = null;
-	            String[] priKeys = null;
-	            HashMap<String, HashMap<String, String>> hashReport = new HashMap<String ,HashMap<String, String>>();
-	            String normKey = new String();
-	            String node = record.getValue("csource");
-	            String recordType = key.getReduceType().toLowerCase();
-	            if(dbTables.containsKey("report.db.name."+recordType)) {
-	            	
-	            	String[] tmp = mdlConfig.findTableName(mdlConfig.get("report.db.name."+recordType), record.getTime(), record.getTime()); 
-	                table = tmp[0];
-	            } else {
-	            	log.debug("report.db.name."+recordType+" does not exist.");
-	            	continue;
-	            }
-	            log.debug("table name:"+table);
-	            try {
-	                priKeys = mdlConfig.get("report.db.primary.key."+recordType).split(",");
-	            } catch (Exception nullException) {
-	            }
-	            for (String field: fields) {
-	            	String keyName = escape(field.toLowerCase(),newSpace);
-	                String keyValue = escape(record.getValue(field).toLowerCase(),newSpace);
-	                if(normalize.containsKey("normalize." + recordType + "." + keyName)) {
-	                	if(normKey.equals("")) {
-	                        normKey = keyName + "." + keyValue;
-	                	} else {
-	                		normKey = normKey + "." + keyName + "." + keyValue;
-	                	}
-	                }
-	                String normalizedKey = "metric." + recordType + "." + normKey;
-	                if(hashReport.containsKey(node)) {
-	                    HashMap<String, String> tmpHash = hashReport.get(node);
-	                    tmpHash.put(normalizedKey, keyValue);
-	                    hashReport.put(node, tmpHash);
-	                } else {
-	                    HashMap<String, String> tmpHash = new HashMap<String, String>();
-	                    tmpHash.put(normalizedKey, keyValue);
-	                    hashReport.put(node, tmpHash);
-	                }
-	            }
-	            for (String field: fields){                
-	                String valueName=escape(field.toLowerCase(),newSpace);
-	                String valueValue=escape(record.getValue(field).toLowerCase(),newSpace);
-	                String normalizedKey = "metric." + recordType + "." + valueName;
-	                if(!normKey.equals("")) {
-	                	normalizedKey = "metric." + recordType + "." + normKey + "." + valueName;
-	                }
-	                if(hashReport.containsKey(node)) {
-	                    HashMap<String, String> tmpHash = hashReport.get(node);
-	                    tmpHash.put(normalizedKey, valueValue);
-	                    hashReport.put(node, tmpHash);
-	                } else {
-	                    HashMap<String, String> tmpHash = new HashMap<String, String>();
-	                    tmpHash.put(normalizedKey, valueValue);
-	                    hashReport.put(node, tmpHash);
-	                    
-	                }
-
-	            }
-	            Iterator<String> i = hashReport.keySet().iterator();
-	            while(i.hasNext()) {
-	                long currentTimeMillis = System.currentTimeMillis();
-	                Object iteratorNode = i.next();
-	                HashMap<String, String> recordSet = hashReport.get(iteratorNode);
-	                Iterator<String> fi = recordSet.keySet().iterator();
-	                // Map any primary key that was not included in the report keyName
-	                String sqlPriKeys = "";
-	                try {
-	                    for (String priKey : priKeys) {
-	                        if(priKey.equals("timestamp")) {
-	                            sqlPriKeys = sqlPriKeys + priKey + " = \"" + sqlTime +"\"";
-	                        }
-	                        if(!priKey.equals(priKeys[priKeys.length-1])) {
-	                            sqlPriKeys = sqlPriKeys + ", ";
-	                        }
-	                    }
-	                } catch (Exception nullException) {
-	                    // ignore if primary key is empty
-	                }
-	                // Map the hash objects to database table columns
-	                String sqlValues = "";
-	                boolean firstValue=true;
-	                while(fi.hasNext()) {
-	                    String fieldKey = (String) fi.next();
-	                    if(transformer.containsKey(fieldKey)) {
-		                	if(!firstValue) {
-		                		sqlValues=sqlValues+", ";
-		                	}
-	                    	try {
-	                        	if(dbSchema.get(dbTables.get("report.db.name."+recordType)).get(transformer.get(fieldKey))== java.sql.Types.VARCHAR||
-	                        			dbSchema.get(dbTables.get("report.db.name."+recordType)).get(transformer.get(fieldKey))== java.sql.Types.BLOB) {
-		                        	if(conversion.containsKey("conversion."+fieldKey)) {
-		                                sqlValues = sqlValues + transformer.get(fieldKey) + "=" + recordSet.get(fieldKey) + conversion.get("conversion."+fieldKey).toString();
-		                        	} else {
-			                            sqlValues = sqlValues + transformer.get(fieldKey) + "=\"" + recordSet.get(fieldKey)+"\"";                                                                
-		                        	}
-	                        	} else {
-	                        		double tmp;
-	                        		tmp=Double.parseDouble(recordSet.get(fieldKey).toString());
-	                        		if(conversion.containsKey("conversion."+fieldKey)) {
-	                        			tmp=tmp*Double.parseDouble(conversion.get("conversion."+fieldKey).toString());
-	                        		}
-	                        		if(Double.isNaN(tmp)) {
-	                        			tmp=0;
-	                        		}
-	                        		sqlValues = sqlValues + transformer.get(fieldKey) + "=" + tmp;
-	                        	}
-	    	                    firstValue=false;	
-	                        } catch (NumberFormatException ex) {
-	                        	if(conversion.containsKey("conversion."+fieldKey)) {
-	                                sqlValues = sqlValues + transformer.get(fieldKey) + "=" + recordSet.get(fieldKey) + conversion.get("conversion."+fieldKey).toString();
-	                        	} else {
-		                            sqlValues = sqlValues + transformer.get(fieldKey) + "=\"" + recordSet.get(fieldKey)+"\"";                                                                
-	                        	}
-	    	                    firstValue=false;
-	                        }
-	                    }
-	                }                
-	
-	                String sql = null;
-	                if(sqlPriKeys.length()>0) {
-	                    sql = "INSERT INTO " + table + " SET " + sqlPriKeys + "," + sqlValues + 
-                        " ON DUPLICATE KEY UPDATE " + sqlPriKeys + "," + sqlValues + ";";	                	
-	                } else {
-	                    sql = "INSERT INTO " + table + " SET " + sqlValues + 
-	                          " ON DUPLICATE KEY UPDATE " + sqlValues + ";";
-	                }
-	                log.debug(sql);
-	                if(batchMode) {
-		                stmt.addBatch(sql);
-		                batch++;
-	                } else {
-	                	stmt.execute(sql);
-	                }
-	                String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
-	                long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
-	                int latencySeconds = ((int)(latencyMillis + 500)) / 1000;
-	    			if(batchMode && batch>20000) {
-	    			    int[] updateCounts = stmt.executeBatch();
-	    			    batch=0;
-	    			}
-	                log.debug(logMsg + " (" + recordType + "," + RecordUtil.getClusterName(record) +
-	                       "," + record.getTime() +
-	                       ") " + latencySeconds + " sec");	               
-	            }
-
-			}
-			if(batchMode) {
-			    int[] updateCounts = stmt.executeBatch();
-			}
-		} catch (SQLException ex) {
-			// handle any errors
-			log.error(ex, ex);
-			log.error("SQLException: " + ex.getMessage());
-			log.error("SQLState: " + ex.getSQLState());
-			log.error("VendorError: " + ex.getErrorCode());
-		} catch (Exception e) {
-			e.printStackTrace();
-		} finally {
-            if (rs != null) {
-                try {
-                    rs.close();
-                } catch (SQLException sqlEx) {
-                    // ignore
-                }
-                rs = null;
+          priKeys = mdlConfig.get("report.db.primary.key." + recordType).split(
+              ",");
+        } catch (Exception nullException) {
+        }
+        for (String field : fields) {
+          String keyName = escape(field.toLowerCase(), newSpace);
+          String keyValue = escape(record.getValue(field).toLowerCase(),
+              newSpace);
+          if (normalize.containsKey("normalize." + recordType + "." + keyName)) {
+            if (normKey.equals("")) {
+              normKey = keyName + "." + keyValue;
+            } else {
+              normKey = normKey + "." + keyName + "." + keyValue;
             }
-            if (stmt != null) {
-                try {
-                    stmt.close();
-                } catch (SQLException sqlEx) {
-                    // ignore
+          }
+          String normalizedKey = "metric." + recordType + "." + normKey;
+          if (hashReport.containsKey(node)) {
+            HashMap<String, String> tmpHash = hashReport.get(node);
+            tmpHash.put(normalizedKey, keyValue);
+            hashReport.put(node, tmpHash);
+          } else {
+            HashMap<String, String> tmpHash = new HashMap<String, String>();
+            tmpHash.put(normalizedKey, keyValue);
+            hashReport.put(node, tmpHash);
+          }
+        }
+        for (String field : fields) {
+          String valueName = escape(field.toLowerCase(), newSpace);
+          String valueValue = escape(record.getValue(field).toLowerCase(),
+              newSpace);
+          String normalizedKey = "metric." + recordType + "." + valueName;
+          if (!normKey.equals("")) {
+            normalizedKey = "metric." + recordType + "." + normKey + "."
+                + valueName;
+          }
+          if (hashReport.containsKey(node)) {
+            HashMap<String, String> tmpHash = hashReport.get(node);
+            tmpHash.put(normalizedKey, valueValue);
+            hashReport.put(node, tmpHash);
+          } else {
+            HashMap<String, String> tmpHash = new HashMap<String, String>();
+            tmpHash.put(normalizedKey, valueValue);
+            hashReport.put(node, tmpHash);
+
+          }
+
+        }
+        Iterator<String> i = hashReport.keySet().iterator();
+        while (i.hasNext()) {
+          long currentTimeMillis = System.currentTimeMillis();
+          Object iteratorNode = i.next();
+          HashMap<String, String> recordSet = hashReport.get(iteratorNode);
+          Iterator<String> fi = recordSet.keySet().iterator();
+          // Map any primary key that was not included in the report keyName
+          String sqlPriKeys = "";
+          try {
+            for (String priKey : priKeys) {
+              if (priKey.equals("timestamp")) {
+                sqlPriKeys = sqlPriKeys + priKey + " = \"" + sqlTime + "\"";
+              }
+              if (!priKey.equals(priKeys[priKeys.length - 1])) {
+                sqlPriKeys = sqlPriKeys + ", ";
+              }
+            }
+          } catch (Exception nullException) {
+            // ignore if primary key is empty
+          }
+          // Map the hash objects to database table columns
+          String sqlValues = "";
+          boolean firstValue = true;
+          while (fi.hasNext()) {
+            String fieldKey = (String) fi.next();
+            if (transformer.containsKey(fieldKey)) {
+              if (!firstValue) {
+                sqlValues = sqlValues + ", ";
+              }
+              try {
+                if (dbSchema.get(dbTables.get("report.db.name." + recordType))
+                    .get(transformer.get(fieldKey)) == java.sql.Types.VARCHAR
+                    || dbSchema.get(
+                        dbTables.get("report.db.name." + recordType)).get(
+                        transformer.get(fieldKey)) == java.sql.Types.BLOB) {
+                  if (conversion.containsKey("conversion." + fieldKey)) {
+                    sqlValues = sqlValues + transformer.get(fieldKey) + "="
+                        + recordSet.get(fieldKey)
+                        + conversion.get("conversion." + fieldKey).toString();
+                  } else {
+                    sqlValues = sqlValues + transformer.get(fieldKey) + "=\""
+                        + recordSet.get(fieldKey) + "\"";
+                  }
+                } else {
+                  double tmp;
+                  tmp = Double.parseDouble(recordSet.get(fieldKey).toString());
+                  if (conversion.containsKey("conversion." + fieldKey)) {
+                    tmp = tmp
+                        * Double.parseDouble(conversion.get(
+                            "conversion." + fieldKey).toString());
+                  }
+                  if (Double.isNaN(tmp)) {
+                    tmp = 0;
+                  }
+                  sqlValues = sqlValues + transformer.get(fieldKey) + "=" + tmp;
+                }
+                firstValue = false;
+              } catch (NumberFormatException ex) {
+                if (conversion.containsKey("conversion." + fieldKey)) {
+                  sqlValues = sqlValues + transformer.get(fieldKey) + "="
+                      + recordSet.get(fieldKey)
+                      + conversion.get("conversion." + fieldKey).toString();
+                } else {
+                  sqlValues = sqlValues + transformer.get(fieldKey) + "=\""
+                      + recordSet.get(fieldKey) + "\"";
                 }
-                stmt = null;
+                firstValue = false;
+              }
             }
-        }    	
+          }
+
+          String sql = null;
+          if (sqlPriKeys.length() > 0) {
+            sql = "INSERT INTO " + table + " SET " + sqlPriKeys + ","
+                + sqlValues + " ON DUPLICATE KEY UPDATE " + sqlPriKeys + ","
+                + sqlValues + ";";
+          } else {
+            sql = "INSERT INTO " + table + " SET " + sqlValues
+                + " ON DUPLICATE KEY UPDATE " + sqlValues + ";";
+          }
+          log.debug(sql);
+          if (batchMode) {
+            stmt.addBatch(sql);
+            batch++;
+          } else {
+            stmt.execute(sql);
+          }
+          String logMsg = (isSuccessful ? "Saved" : "Error occurred in saving");
+          long latencyMillis = System.currentTimeMillis() - currentTimeMillis;
+          int latencySeconds = ((int) (latencyMillis + 500)) / 1000;
+          if (batchMode && batch > 20000) {
+            int[] updateCounts = stmt.executeBatch();
+            batch = 0;
+          }
+          log.debug(logMsg + " (" + recordType + ","
+              + RecordUtil.getClusterName(record) + "," + record.getTime()
+              + ") " + latencySeconds + " sec");
+        }
+
+      }
+      if (batchMode) {
+        int[] updateCounts = stmt.executeBatch();
+      }
+    } catch (SQLException ex) {
+      // handle any errors
+      log.error(ex, ex);
+      log.error("SQLException: " + ex.getMessage());
+      log.error("SQLState: " + ex.getSQLState());
+      log.error("VendorError: " + ex.getErrorCode());
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException sqlEx) {
+          // ignore
+        }
+        rs = null;
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException sqlEx) {
+          // ignore
+        }
+        stmt = null;
+      }
     }
-    
-	public static void main(String[] args) {
-		try {
-			MetricDataLoader mdl = new MetricDataLoader(args[0]);
-			mdl.process(new Path(args[1]));
-		} catch(Exception e) {
-			e.printStackTrace();
-		}
+  }
+
+  public static void main(String[] args) {
+    try {
+      MetricDataLoader mdl = new MetricDataLoader(args[0]);
+      mdl.process(new Path(args[1]));
+    } catch (Exception e) {
+      e.printStackTrace();
     }
-    
+  }
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java Wed Mar 11 22:39:26 2009
@@ -26,20 +26,19 @@
 import org.apache.hadoop.mapred.lib.MultipleSequenceFileOutputFormat;
 import org.apache.log4j.Logger;
 
-public class ChukwaRecordOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaRecordKey, ChukwaRecord>
-{
-	static Logger log = Logger.getLogger(ChukwaRecordOutputFormat.class);
-	
-	@Override
-	protected String generateFileNameForKeyValue(ChukwaRecordKey key, ChukwaRecord record,
-			String name)
-	{
-		String output = RecordUtil.getClusterName(record)
-							+ "/" + key.getReduceType() 
-							+ "/" + key.getReduceType() + Util.generateTimeOutput(record.getTime());
+public class ChukwaRecordOutputFormat extends
+    MultipleSequenceFileOutputFormat<ChukwaRecordKey, ChukwaRecord> {
+  static Logger log = Logger.getLogger(ChukwaRecordOutputFormat.class);
 
-		//{log.info("ChukwaOutputFormat.fileName: [" + output +"]");}
-	
-		return output;
-	}
+  @Override
+  protected String generateFileNameForKeyValue(ChukwaRecordKey key,
+      ChukwaRecord record, String name) {
+    String output = RecordUtil.getClusterName(record) + "/"
+        + key.getReduceType() + "/" + key.getReduceType()
+        + Util.generateTimeOutput(record.getTime());
+
+    // {log.info("ChukwaOutputFormat.fileName: [" + output +"]");}
+
+    return output;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordPartitioner.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,34 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
+
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.log4j.Logger;
 
-public class ChukwaRecordPartitioner<K, V> 
-	implements Partitioner<ChukwaRecordKey, ChukwaRecord>
-{
-	static Logger log = Logger.getLogger(ChukwaRecordPartitioner.class);
-	public void configure(JobConf arg0)
-	{}
-
-	public int getPartition(org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey key,
-			org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord record, int numReduceTasks)
-	{
-		if (log.isDebugEnabled())
-		{
-
-			log.debug("Partitioner key: [" + key.getReduceType() 
-					+ "] - Reducer:"
-					+ ( (key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks));	
-		}
-		return (key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
-	}
+public class ChukwaRecordPartitioner<K, V> implements
+    Partitioner<ChukwaRecordKey, ChukwaRecord> {
+  static Logger log = Logger.getLogger(ChukwaRecordPartitioner.class);
+
+  public void configure(JobConf arg0) {
+  }
+
+  public int getPartition(
+      org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey key,
+      org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord record,
+      int numReduceTasks) {
+    if (log.isDebugEnabled()) {
+
+      log
+          .debug("Partitioner key: ["
+              + key.getReduceType()
+              + "] - Reducer:"
+              + ((key.getReduceType().hashCode() & Integer.MAX_VALUE) % numReduceTasks));
+    }
+    return (key.getReduceType().hashCode() & Integer.MAX_VALUE)
+        % numReduceTasks;
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
-
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -45,198 +45,195 @@
 import org.apache.log4j.Logger;
 
 // TODO do an abstract class for all rolling 
-public class DailyChukwaRecordRolling extends Configured implements Tool
-{
-	static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
-	
-	static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
-	static ChukwaConfiguration conf = null;
-	static FileSystem fs = null;
-	static final String HadoopLogDir = "_logs";
-	static final String hadoopTempDir = "_temporary";
-	
-	static boolean rollInSequence = true;
-	static boolean deleteRawdata = false;
-
-	public static void usage()
-	{
-		System.err.println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
-		System.exit(-1);
-	}
-	
-	
-	public static void buildDailyFiles(String chukwaMainRepository, String tempDir,String rollingFolder, int workingDay) throws IOException
-	{
-		// process
-		Path dayPath = new Path(rollingFolder + "/daily/" + workingDay) ;
-		FileStatus[] clustersFS = fs.listStatus(dayPath);
-		for(FileStatus clusterFs : clustersFS)
-		{
-			String cluster = clusterFs.getPath().getName();
-			
-			Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/" + workingDay + "/" + cluster) ;
-			FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
-			for(FileStatus dataSourceFS : dataSourcesFS)
-			{
-				String dataSource = dataSourceFS.getPath().getName();
-				// Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
-				
-				// put the rotate flag
-				fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/rotateDone"));
-				
-				// rotate
-				// Merge
-				String[] mergeArgs = new String[5];
-				// input
-				mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay +  "/[0-24]*/*.evt";
-				// temp dir
-				mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/" + workingDay + "_" + System.currentTimeMillis();
-				// final output dir
-				mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay  ;
-				// final output fileName
-				mergeArgs[3] =  dataSource +"_" + workingDay ;
-				// delete rolling directory
-				mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster + "/" + dataSource; 
-						
-				
-				log.info("DailyChukwaRecordRolling 0: " +  mergeArgs[0] );
-				log.info("DailyChukwaRecordRolling 1: " +  mergeArgs[1] );
-				log.info("DailyChukwaRecordRolling 2: " +  mergeArgs[2] );
-				log.info("DailyChukwaRecordRolling 3: " +  mergeArgs[3] );
-				log.info("DailyChukwaRecordRolling 4: " +  mergeArgs[4] );
-				
-				RecordMerger merge = new RecordMerger(conf,fs,new DailyChukwaRecordRolling(),mergeArgs,deleteRawdata);
-				List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
-				if (rollInSequence)
-				{ merge.run(); }
-				else
-				{ 
-					allMerge.add(merge);
-					merge.start();
-				}
-				
-				// join all Threads
-				if (!rollInSequence)
-				{
-					while(allMerge.size() > 0)
-					{
-						RecordMerger m = allMerge.remove(0);
-						try
-						{ m.join(); } 
-						catch (InterruptedException e) {}
-					}
-				} // End if (!rollInSequence)
-				
-				// Delete the processed dataSourceFS
-				FileUtil.fullyDelete(fs,dataSourceFS.getPath());
-				
-			} // End for(FileStatus dataSourceFS : dataSourcesFS)
-			
-			// Delete the processed clusterFs
-			FileUtil.fullyDelete(fs,clusterFs.getPath());
-			
-		} // End for(FileStatus clusterFs : clustersFS)
-		
-		// Delete the processed dayPath
-		FileUtil.fullyDelete(fs,dayPath);
-	}
-	
-	/**
-	 * @param args
-	 * @throws Exception 
-	 */
-	public static void main(String[] args) throws  Exception
-	{
-		conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		fs = FileSystem.get(new URI(fsName), conf);
-		
-		// TODO read from config
-		String rollingFolder = "/chukwa/rolling/";
-		String chukwaMainRepository = "/chukwa/repos/";
-		String tempDir = "/chukwa/temp/dailyRolling/";
-		
-		
-		// TODO do a real parameter parsing
-		if (args.length != 4)
-		 { usage(); }
-		
-		if (!args[0].equalsIgnoreCase("rollInSequence"))
-		 { usage(); }
-		
-		if (!args[2].equalsIgnoreCase("deleteRawdata"))
-		 { usage(); }
-			
-		if (args[1].equalsIgnoreCase("true"))
-		 { rollInSequence = true; }
-		else
-		 { rollInSequence = false; }
-		
-		if (args[3].equalsIgnoreCase("true"))
-		 { deleteRawdata = true; }
-		else
-		 { deleteRawdata = false; }
-		
-
-		log.info("rollInSequence: " + rollInSequence);
-		log.info("deleteRawdata: " + deleteRawdata);
-		
-		Calendar calendar = Calendar.getInstance();	
-		int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
-		int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
-		log.info("CurrentDay: " + currentDay);
-		log.info("currentHour" + currentHour);
-	
-		Path rootFolder = new Path(rollingFolder + "/daily/") ;
-		
-		FileStatus[] daysFS = fs.listStatus(rootFolder);
-		for(FileStatus dayFS : daysFS)
-		{
-			try
-			{ 
-				int workingDay = Integer.parseInt(dayFS.getPath().getName());
-				if (  workingDay < currentDay)
-				{
-					buildDailyFiles(chukwaMainRepository, tempDir,rollingFolder,workingDay);
-				} // End if (  workingDay < currentDay)
-			} // End Try workingDay = Integer.parseInt(sdf.format(dayFS.getPath().getName()));
-			catch(NumberFormatException e)
-			{ /* Not a standard Day directory skip */ }
-			
-		} // for(FileStatus dayFS : daysFS)		
-	}
-	
-	
-	public int run(String[] args) throws Exception
-	{
-		JobConf conf = new JobConf(getConf(), DailyChukwaRecordRolling.class);
-
-		conf.setJobName("DailyChukwa-Rolling");
-		conf.setInputFormat(SequenceFileInputFormat.class);
-		
-		conf.setMapperClass(IdentityMapper.class);
-		conf.setReducerClass(IdentityReducer.class);
-
-				
-		conf.setOutputKeyClass(ChukwaRecordKey.class);
-		conf.setOutputValueClass(ChukwaRecord.class);
-		conf.setOutputFormat(SequenceFileOutputFormat.class);
-		
-		conf.set("mapred.compress.map.output", "true");
-		conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.LzoCodec");
-		conf.set("mapred.output.compress", "true");
-		conf.set("mapred.output.compression.type", "BLOCK");
-	
-		
-		log.info("DailyChukwaRecordRolling input: " +  args[0] );
-		log.info("DailyChukwaRecordRolling output: " +  args[1] );
-		
-		
-		FileInputFormat.setInputPaths(conf, args[0]);
-		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
-		JobClient.runJob(conf);
-		return 0;
-	}
-	
+public class DailyChukwaRecordRolling extends Configured implements Tool {
+  static Logger log = Logger.getLogger(DailyChukwaRecordRolling.class);
+
+  static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+  static ChukwaConfiguration conf = null;
+  static FileSystem fs = null;
+  static final String HadoopLogDir = "_logs";
+  static final String hadoopTempDir = "_temporary";
+
+  static boolean rollInSequence = true;
+  static boolean deleteRawdata = false;
+
+  public static void usage() {
+    System.err
+        .println("usage: java org.apache.hadoop.chukwa.extraction.demux.DailyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
+    System.exit(-1);
+  }
+
+  public static void buildDailyFiles(String chukwaMainRepository,
+      String tempDir, String rollingFolder, int workingDay) throws IOException {
+    // process
+    Path dayPath = new Path(rollingFolder + "/daily/" + workingDay);
+    FileStatus[] clustersFS = fs.listStatus(dayPath);
+    for (FileStatus clusterFs : clustersFS) {
+      String cluster = clusterFs.getPath().getName();
+
+      Path dataSourceClusterHourPaths = new Path(rollingFolder + "/daily/"
+          + workingDay + "/" + cluster);
+      FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
+      for (FileStatus dataSourceFS : dataSourcesFS) {
+        String dataSource = dataSourceFS.getPath().getName();
+        // Repo path = reposRootDirectory/<cluster>/<day>/*/*.evt
+
+        // put the rotate flag
+        fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
+            + dataSource + "/" + workingDay + "/rotateDone"));
+
+        // rotate
+        // Merge
+        String[] mergeArgs = new String[5];
+        // input
+        mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+            + "/" + workingDay + "/[0-24]*/*.evt";
+        // temp dir
+        mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
+            + workingDay + "_" + System.currentTimeMillis();
+        // final output dir
+        mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+            + "/" + workingDay;
+        // final output fileName
+        mergeArgs[3] = dataSource + "_" + workingDay;
+        // delete rolling directory
+        mergeArgs[4] = rollingFolder + "/daily/" + workingDay + "/" + cluster
+            + "/" + dataSource;
+
+        log.info("DailyChukwaRecordRolling 0: " + mergeArgs[0]);
+        log.info("DailyChukwaRecordRolling 1: " + mergeArgs[1]);
+        log.info("DailyChukwaRecordRolling 2: " + mergeArgs[2]);
+        log.info("DailyChukwaRecordRolling 3: " + mergeArgs[3]);
+        log.info("DailyChukwaRecordRolling 4: " + mergeArgs[4]);
+
+        RecordMerger merge = new RecordMerger(conf, fs,
+            new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata);
+        List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+        if (rollInSequence) {
+          merge.run();
+        } else {
+          allMerge.add(merge);
+          merge.start();
+        }
+
+        // join all Threads
+        if (!rollInSequence) {
+          while (allMerge.size() > 0) {
+            RecordMerger m = allMerge.remove(0);
+            try {
+              m.join();
+            } catch (InterruptedException e) {
+            }
+          }
+        } // End if (!rollInSequence)
+
+        // Delete the processed dataSourceFS
+        FileUtil.fullyDelete(fs, dataSourceFS.getPath());
+
+      } // End for(FileStatus dataSourceFS : dataSourcesFS)
+
+      // Delete the processed clusterFs
+      FileUtil.fullyDelete(fs, clusterFs.getPath());
+
+    } // End for(FileStatus clusterFs : clustersFS)
+
+    // Delete the processed dayPath
+    FileUtil.fullyDelete(fs, dayPath);
+  }
+
+  /**
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    conf = new ChukwaConfiguration();
+    String fsName = conf.get("writer.hdfs.filesystem");
+    fs = FileSystem.get(new URI(fsName), conf);
+
+    // TODO read from config
+    String rollingFolder = "/chukwa/rolling/";
+    String chukwaMainRepository = "/chukwa/repos/";
+    String tempDir = "/chukwa/temp/dailyRolling/";
+
+    // TODO do a real parameter parsing
+    if (args.length != 4) {
+      usage();
+    }
+
+    if (!args[0].equalsIgnoreCase("rollInSequence")) {
+      usage();
+    }
+
+    if (!args[2].equalsIgnoreCase("deleteRawdata")) {
+      usage();
+    }
+
+    if (args[1].equalsIgnoreCase("true")) {
+      rollInSequence = true;
+    } else {
+      rollInSequence = false;
+    }
+
+    if (args[3].equalsIgnoreCase("true")) {
+      deleteRawdata = true;
+    } else {
+      deleteRawdata = false;
+    }
+
+    log.info("rollInSequence: " + rollInSequence);
+    log.info("deleteRawdata: " + deleteRawdata);
+
+    Calendar calendar = Calendar.getInstance();
+    int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
+    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+    log.info("CurrentDay: " + currentDay);
+    log.info("currentHour" + currentHour);
+
+    Path rootFolder = new Path(rollingFolder + "/daily/");
+
+    FileStatus[] daysFS = fs.listStatus(rootFolder);
+    for (FileStatus dayFS : daysFS) {
+      try {
+        int workingDay = Integer.parseInt(dayFS.getPath().getName());
+        if (workingDay < currentDay) {
+          buildDailyFiles(chukwaMainRepository, tempDir, rollingFolder,
+              workingDay);
+        } // End if ( workingDay < currentDay)
+      } // End Try workingDay =
+        // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
+      catch (NumberFormatException e) { /* Not a standard Day directory skip */
+      }
+
+    } // for(FileStatus dayFS : daysFS)
+  }
+
+  public int run(String[] args) throws Exception {
+    JobConf conf = new JobConf(getConf(), DailyChukwaRecordRolling.class);
+
+    conf.setJobName("DailyChukwa-Rolling");
+    conf.setInputFormat(SequenceFileInputFormat.class);
+
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+
+    conf.setOutputKeyClass(ChukwaRecordKey.class);
+    conf.setOutputValueClass(ChukwaRecord.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+    conf.set("mapred.compress.map.output", "true");
+    conf.set("mapred.map.output.compression.codec",
+        "org.apache.hadoop.io.compress.LzoCodec");
+    conf.set("mapred.output.compress", "true");
+    conf.set("mapred.output.compression.type", "BLOCK");
+
+    log.info("DailyChukwaRecordRolling input: " + args[0]);
+    log.info("DailyChukwaRecordRolling output: " + args[1]);
+
+    FileInputFormat.setInputPaths(conf, args[0]);
+    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+    JobClient.runJob(conf);
+    return 0;
+  }
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
+
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
-
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.extraction.demux.processor.ChukwaOutputCollector;
@@ -50,164 +50,152 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
 
-public class Demux extends Configured implements Tool
-{
-	static Logger log = Logger.getLogger(Demux.class);
-	static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
-	 
-	public static class MapClass extends MapReduceBase implements
-			Mapper<ChukwaArchiveKey, ChunkImpl , ChukwaRecordKey, ChukwaRecord>
-	{
-		JobConf jobConf = null;
-		
-		@Override
-		public void configure(JobConf jobConf)
-		{
-			super.configure(jobConf);
-			this.jobConf = jobConf;
-		}
-
-		public void map(ChukwaArchiveKey key, ChunkImpl chunk,
-				OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
-				throws IOException
-		{
-		
-		  ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector("DemuxMapOutput",output,reporter);
-			try 
-			{
-				long duration = System.currentTimeMillis();
-				if (log.isDebugEnabled())
-				{
-					log.debug("Entry: ["+ chunk.getData() + "] EventType: [" + chunk.getDataType() + "]");	
-				}
-				String processorClass = jobConf.get(chunk.getDataType(), 
-							"org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
-			 
-				if (!processorClass.equalsIgnoreCase("Drop"))
-				{
-				  reporter.incrCounter("DemuxMapInput", "total chunks", 1);
-				  reporter.incrCounter("DemuxMapInput", chunk.getDataType() + " chunks" , 1);
-          
-					MapProcessor processor = MapProcessorFactory.getProcessor(processorClass);
-					processor.process(key,chunk, chukwaOutputCollector, reporter);
-					if (log.isDebugEnabled())
-					{	
-						duration = System.currentTimeMillis() - duration;
-						log.debug("Demux:Map dataType:" + chunk.getDataType() + 
-							" duration:" + duration + " processor:" + processorClass + " recordCount:" + chunk.getRecordOffsets().length );
-					}
-					
-				}
-				else
-				{
-					log.info("action:Demux, dataType:" + chunk.getDataType() +
-							" duration:0 processor:Drop recordCount:" + chunk.getRecordOffsets().length );
-				}
-				
-			} 
-			catch(Exception e) 
-			{
-				log.warn("Exception in Demux:MAP", e);
-				e.printStackTrace();
-			}
-		}
-	}
-
-	 public static class ReduceClass extends MapReduceBase implements
-			Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord>
-	{
-		public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
-				OutputCollector<ChukwaRecordKey, ChukwaRecord> output,
-				Reporter reporter) throws IOException
-		{
-		  ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector("DemuxReduceOutput",output,reporter);
-			try 
-			{
-				long duration = System.currentTimeMillis();
+public class Demux extends Configured implements Tool {
+  static Logger log = Logger.getLogger(Demux.class);
+  static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
+
+  public static class MapClass extends MapReduceBase implements
+      Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> {
+    JobConf jobConf = null;
+
+    @Override
+    public void configure(JobConf jobConf) {
+      super.configure(jobConf);
+      this.jobConf = jobConf;
+    }
+
+    public void map(ChukwaArchiveKey key, ChunkImpl chunk,
+        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+        throws IOException {
+
+      ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
+          "DemuxMapOutput", output, reporter);
+      try {
+        long duration = System.currentTimeMillis();
+        if (log.isDebugEnabled()) {
+          log.debug("Entry: [" + chunk.getData() + "] EventType: ["
+              + chunk.getDataType() + "]");
+        }
+        String processorClass = jobConf
+            .get(chunk.getDataType(),
+                "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
+
+        if (!processorClass.equalsIgnoreCase("Drop")) {
+          reporter.incrCounter("DemuxMapInput", "total chunks", 1);
+          reporter.incrCounter("DemuxMapInput",
+              chunk.getDataType() + " chunks", 1);
+
+          MapProcessor processor = MapProcessorFactory
+              .getProcessor(processorClass);
+          processor.process(key, chunk, chukwaOutputCollector, reporter);
+          if (log.isDebugEnabled()) {
+            duration = System.currentTimeMillis() - duration;
+            log.debug("Demux:Map dataType:" + chunk.getDataType()
+                + " duration:" + duration + " processor:" + processorClass
+                + " recordCount:" + chunk.getRecordOffsets().length);
+          }
+
+        } else {
+          log.info("action:Demux, dataType:" + chunk.getDataType()
+              + " duration:0 processor:Drop recordCount:"
+              + chunk.getRecordOffsets().length);
+        }
+
+      } catch (Exception e) {
+        log.warn("Exception in Demux:MAP", e);
+        e.printStackTrace();
+      }
+    }
+  }
+
+  public static class ReduceClass extends MapReduceBase implements
+      Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> {
+    public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values,
+        OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter)
+        throws IOException {
+      ChukwaOutputCollector chukwaOutputCollector = new ChukwaOutputCollector(
+          "DemuxReduceOutput", output, reporter);
+      try {
+        long duration = System.currentTimeMillis();
         reporter.incrCounter("DemuxReduceInput", "total distinct keys", 1);
-        reporter.incrCounter("DemuxReduceInput",  key.getReduceType() +" total distinct keys" , 1);
-        
-        ReduceProcessorFactory.getProcessor(key.getReduceType()).process(key,values, chukwaOutputCollector, reporter);
-
-				if (log.isDebugEnabled())
-				{	
-					duration = System.currentTimeMillis() - duration;
-					log.debug("Demux:Reduce, dataType:" + key.getReduceType() +" duration:" + duration);
-				}
-				
-			} 
-			catch(Exception e) 
-			{
-				log.warn("Exception in Demux:Reduce", e);
-				e.printStackTrace();
-			}
-		}
-	}
-	
-	static int printUsage() {
-		System.out
-				.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
-		ToolRunner.printGenericCommandUsage(System.out);
-		return -1;
-	}
-
-	public int run(String[] args) throws Exception
-	{
-		JobConf conf = new JobConf(getConf(), Demux.class);
-		 conf.addResource(new Path("conf/chukwa-demux-conf.xml"));
-		
-		conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
-		conf.setInputFormat(SequenceFileInputFormat.class);
-		conf.setMapperClass(Demux.MapClass.class);
-		conf.setPartitionerClass(ChukwaRecordPartitioner.class);
-		conf.setReducerClass(Demux.ReduceClass.class);
-
-		conf.setOutputKeyClass(ChukwaRecordKey.class);
-		conf.setOutputValueClass(ChukwaRecord.class);
-		conf.setOutputFormat(ChukwaRecordOutputFormat.class);
-
-//    conf.setCompressMapOutput(true);
- //   conf.setMapOutputCompressorClass(LzoCodec.class);
-		
-		
-		List<String> other_args = new ArrayList<String>();
-		for (int i = 0; i < args.length; ++i) {
-			try {
-				if ("-m".equals(args[i])) {
-					conf.setNumMapTasks(Integer.parseInt(args[++i]));
-				} else if ("-r".equals(args[i])) {
-					conf.setNumReduceTasks(Integer.parseInt(args[++i]));
-				} else 	{
-					other_args.add(args[i]);
-				}
-			} catch (NumberFormatException except) {
-				System.out.println("ERROR: Integer expected instead of "
-						+ args[i]);
-				return printUsage();
-			} catch (ArrayIndexOutOfBoundsException except) {
-				System.out.println("ERROR: Required parameter missing from "
-						+ args[i - 1]);
-				return printUsage();
-			}
-		}
-		// Make sure there are exactly 2 parameters left.
-		if (other_args.size() != 2) {
-			System.out.println("ERROR: Wrong number of parameters: "
-					+ other_args.size() + " instead of 2.");
-			return printUsage();
-		}
-		
-		FileInputFormat.setInputPaths(conf, other_args.get(0));
-		FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
-
-		JobClient.runJob(conf);
-		return 0;
-	}
-
-	public static void main(String[] args) throws Exception {
-		int res = ToolRunner.run(new Configuration(),
-				new Demux(), args);
-		System.exit(res);
-	}
+        reporter.incrCounter("DemuxReduceInput", key.getReduceType()
+            + " total distinct keys", 1);
+
+        ReduceProcessorFactory.getProcessor(key.getReduceType()).process(key,
+            values, chukwaOutputCollector, reporter);
+
+        if (log.isDebugEnabled()) {
+          duration = System.currentTimeMillis() - duration;
+          log.debug("Demux:Reduce, dataType:" + key.getReduceType()
+              + " duration:" + duration);
+        }
+
+      } catch (Exception e) {
+        log.warn("Exception in Demux:Reduce", e);
+        e.printStackTrace();
+      }
+    }
+  }
+
+  static int printUsage() {
+    System.out.println("Demux [-m <maps>] [-r <reduces>] <input> <output>");
+    ToolRunner.printGenericCommandUsage(System.out);
+    return -1;
+  }
+
+  public int run(String[] args) throws Exception {
+    JobConf conf = new JobConf(getConf(), Demux.class);
+    conf.addResource(new Path("conf/chukwa-demux-conf.xml"));
+
+    conf.setJobName("Chukwa-Demux_" + day.format(new Date()));
+    conf.setInputFormat(SequenceFileInputFormat.class);
+    conf.setMapperClass(Demux.MapClass.class);
+    conf.setPartitionerClass(ChukwaRecordPartitioner.class);
+    conf.setReducerClass(Demux.ReduceClass.class);
+
+    conf.setOutputKeyClass(ChukwaRecordKey.class);
+    conf.setOutputValueClass(ChukwaRecord.class);
+    conf.setOutputFormat(ChukwaRecordOutputFormat.class);
+
+    // conf.setCompressMapOutput(true);
+    // conf.setMapOutputCompressorClass(LzoCodec.class);
+
+    List<String> other_args = new ArrayList<String>();
+    for (int i = 0; i < args.length; ++i) {
+      try {
+        if ("-m".equals(args[i])) {
+          conf.setNumMapTasks(Integer.parseInt(args[++i]));
+        } else if ("-r".equals(args[i])) {
+          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
+        } else {
+          other_args.add(args[i]);
+        }
+      } catch (NumberFormatException except) {
+        System.out.println("ERROR: Integer expected instead of " + args[i]);
+        return printUsage();
+      } catch (ArrayIndexOutOfBoundsException except) {
+        System.out.println("ERROR: Required parameter missing from "
+            + args[i - 1]);
+        return printUsage();
+      }
+    }
+    // Make sure there are exactly 2 parameters left.
+    if (other_args.size() != 2) {
+      System.out.println("ERROR: Wrong number of parameters: "
+          + other_args.size() + " instead of 2.");
+      return printUsage();
+    }
+
+    FileInputFormat.setInputPaths(conf, other_args.get(0));
+    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
+
+    JobClient.runJob(conf);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new Demux(), args);
+    System.exit(res);
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java Wed Mar 11 22:39:26 2009
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
+
 import java.io.IOException;
 import java.net.URI;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.List;
-
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -45,210 +45,213 @@
 import org.apache.log4j.Logger;
 
 // TODO do an abstract class for all rolling 
-public class HourlyChukwaRecordRolling extends Configured implements Tool
-{
-	static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
-	
-	static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
-	static ChukwaConfiguration conf = null;
-	static FileSystem fs = null;
-	static final String HadoopLogDir = "_logs";
-	static final String hadoopTempDir = "_temporary";
-	
-	static boolean rollInSequence = true;
-	static boolean deleteRawdata = false;
-
-	public static void usage()
-	{
-		System.err.println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
-		System.exit(-1);
-	}
-	
-	
-	public static void buildHourlyFiles(String chukwaMainRepository, String tempDir,String rollingFolder, int workingDay, int workingHour) throws IOException
-	{
-		// process
-		Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/" + workingHour) ;
-		FileStatus[] clustersFS = fs.listStatus(hourPath);
-		for(FileStatus clusterFs : clustersFS)
-		{
-			String cluster = clusterFs.getPath().getName();
-			
-			Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/" + workingDay + "/" + workingHour + "/" + cluster) ;
-			FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
-			for(FileStatus dataSourceFS : dataSourcesFS)
-			{
-				String dataSource = dataSourceFS.getPath().getName();
-				// Repo path = reposRootDirectory/<cluster>/<day>/<hour>/*/*.evt
-				
-				// put the rotate flag
-				fs.mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour + "/rotateDone"));
-				
-				// rotate
-				// Merge
-				String[] mergeArgs = new String[5];
-				// input
-				mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
-				// temp dir
-				mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour + "_" + System.currentTimeMillis() ;
-				// final output dir
-				mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource + "/" + workingDay + "/" + workingHour ;
-				// final output fileName
-				mergeArgs[3] =  dataSource +"_" + workingDay +"_" + workingHour;
-				// delete rolling directory
-				mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/" + workingHour + "/" + cluster + "/" + dataSource; 
-						
-				
-				log.info("HourlyChukwaRecordRolling 0: " +  mergeArgs[0] );
-				log.info("HourlyChukwaRecordRolling 1: " +  mergeArgs[1] );
-				log.info("HourlyChukwaRecordRolling 2: " +  mergeArgs[2] );
-				log.info("HourlyChukwaRecordRolling 3: " +  mergeArgs[3] );
-				log.info("HourlyChukwaRecordRolling 4: " +  mergeArgs[4] );
-				
-				RecordMerger merge = new RecordMerger(conf,fs,new HourlyChukwaRecordRolling(), mergeArgs,deleteRawdata);
-				List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
-				if (rollInSequence)
-				{ merge.run(); }
-				else
-				{ 
-					allMerge.add(merge);
-					merge.start();
-				}
-				
-				// join all Threads
-				if (!rollInSequence)
-				{
-					while(allMerge.size() > 0)
-					{
-						RecordMerger m = allMerge.remove(0);
-						try
-						{ m.join(); } 
-						catch (InterruptedException e) {}
-					}
-				} // End if (!rollInSequence)
-				
-				// Delete the processed dataSourceFS
-				FileUtil.fullyDelete(fs,dataSourceFS.getPath());
-				
-			} // End for(FileStatus dataSourceFS : dataSourcesFS)
-			
-			// Delete the processed clusterFs
-			FileUtil.fullyDelete(fs,clusterFs.getPath());
-			
-		} // End for(FileStatus clusterFs : clustersFS)
-		
-		// Delete the processed hour
-		FileUtil.fullyDelete(fs,hourPath);
-	}
-	
-	/**
-	 * @param args
-	 * @throws Exception 
-	 */
-	public static void main(String[] args) throws  Exception
-	{
-		conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		fs = FileSystem.get(new URI(fsName), conf);
-		
-		// TODO read from config
-		String rollingFolder = "/chukwa/rolling/";
-		String chukwaMainRepository = "/chukwa/repos/";
-		String tempDir = "/chukwa/temp/hourlyRolling/";
-		
-		
-		// TODO do a real parameter parsing
-		if (args.length != 4)
-		 { usage(); }
-		
-		if (!args[0].equalsIgnoreCase("rollInSequence"))
-		 { usage(); }
-		
-		if (!args[2].equalsIgnoreCase("deleteRawdata"))
-		 { usage(); }
-			
-		if (args[1].equalsIgnoreCase("true"))
-		 { rollInSequence = true; }
-		else
-		 { rollInSequence = false; }
-		
-		if (args[3].equalsIgnoreCase("true"))
-		 { deleteRawdata = true; }
-		else
-		 { deleteRawdata = false; }
-		
-
-		
-		Calendar calendar = Calendar.getInstance();	
-		int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
-		int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
-		log.info("CurrentDay: " + currentDay);
-		log.info("currentHour" + currentHour);
-	
-		Path rootFolder = new Path(rollingFolder + "/hourly/") ;
-		
-		FileStatus[] daysFS = fs.listStatus(rootFolder);
-		for(FileStatus dayFS : daysFS)
-		{
-			try
-			{ 
-				log.info("dayFs:" + dayFS.getPath().getName());
-				int workingDay = Integer.parseInt(dayFS.getPath().getName());
-				
-				Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay) ;
-				FileStatus[] hoursFS = fs.listStatus(hourlySrc);
-				for(FileStatus hourFS : hoursFS)
-				{
-					String workinhHourStr = hourFS.getPath().getName();
-					int workingHour = Integer.parseInt(workinhHourStr);
-					if ( 	
-							(workingDay < currentDay) || // all previous days
-							( (workingDay == currentDay) &&  (workingHour < currentHour) ) // Up to the last hour
-						) 
-					{
-
-						buildHourlyFiles(chukwaMainRepository,tempDir,rollingFolder, workingDay,workingHour);
-						
-					} // End if ( (workingDay < currentDay) || ( (workingDay == currentDay) &&  (intHour < currentHour) ) )
-				} // End for(FileStatus hourFS : hoursFS)		
-			} // End Try workingDay = Integer.parseInt(sdf.format(dayFS.getPath().getName()));
-			catch(NumberFormatException e)
-			{ /* Not a standard Day directory skip */ }
-			
-		} // for(FileStatus dayFS : daysFS)		
-	}
-	
-	
-	public int run(String[] args) throws Exception
-	{
-		JobConf conf = new JobConf(getConf(), HourlyChukwaRecordRolling.class);
-
-		conf.setJobName("HourlyChukwa-Rolling");
-		conf.setInputFormat(SequenceFileInputFormat.class);
-		
-		conf.setMapperClass(IdentityMapper.class);
-		conf.setReducerClass(IdentityReducer.class);
-
-				
-		conf.setOutputKeyClass(ChukwaRecordKey.class);
-		conf.setOutputValueClass(ChukwaRecord.class);
-		conf.setOutputFormat(SequenceFileOutputFormat.class);
-		
-		conf.set("mapred.compress.map.output", "true");
-		conf.set("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.LzoCodec");
-		conf.set("mapred.output.compress", "true");
-		conf.set("mapred.output.compression.type", "BLOCK");
-		
-		
-		log.info("HourlyChukwaRecordRolling input: " +  args[0] );
-		log.info("HourlyChukwaRecordRolling output: " +  args[1] );
-		
-		
-		FileInputFormat.setInputPaths(conf, args[0]);
-		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
-		JobClient.runJob(conf);
-		return 0;
-	}
-	
+public class HourlyChukwaRecordRolling extends Configured implements Tool {
+  static Logger log = Logger.getLogger(HourlyChukwaRecordRolling.class);
+
+  static SimpleDateFormat sdf = new java.text.SimpleDateFormat("yyyyMMdd");
+  static ChukwaConfiguration conf = null;
+  static FileSystem fs = null;
+  static final String HadoopLogDir = "_logs";
+  static final String hadoopTempDir = "_temporary";
+
+  static boolean rollInSequence = true;
+  static boolean deleteRawdata = false;
+
+  public static void usage() {
+    System.err
+        .println("usage: java org.apache.hadoop.chukwa.extraction.demux.HourlyChukwaRecordRolling rollInSequence <True/False> deleteRawdata <True/False>");
+    System.exit(-1);
+  }
+
+  public static void buildHourlyFiles(String chukwaMainRepository,
+      String tempDir, String rollingFolder, int workingDay, int workingHour)
+      throws IOException {
+    // process
+    Path hourPath = new Path(rollingFolder + "/hourly/" + workingDay + "/"
+        + workingHour);
+    FileStatus[] clustersFS = fs.listStatus(hourPath);
+    for (FileStatus clusterFs : clustersFS) {
+      String cluster = clusterFs.getPath().getName();
+
+      Path dataSourceClusterHourPaths = new Path(rollingFolder + "/hourly/"
+          + workingDay + "/" + workingHour + "/" + cluster);
+      FileStatus[] dataSourcesFS = fs.listStatus(dataSourceClusterHourPaths);
+      for (FileStatus dataSourceFS : dataSourcesFS) {
+        String dataSource = dataSourceFS.getPath().getName();
+        // Repo path = reposRootDirectory/<cluster>/<day>/<hour>/*/*.evt
+
+        // put the rotate flag
+        fs
+            .mkdirs(new Path(chukwaMainRepository + "/" + cluster + "/"
+                + dataSource + "/" + workingDay + "/" + workingHour
+                + "/rotateDone"));
+
+        // rotate
+        // Merge
+        String[] mergeArgs = new String[5];
+        // input
+        mergeArgs[0] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+            + "/" + workingDay + "/" + workingHour + "/[0-5]*/*.evt";
+        // temp dir
+        mergeArgs[1] = tempDir + "/" + cluster + "/" + dataSource + "/"
+            + workingDay + "/" + workingHour + "_" + System.currentTimeMillis();
+        // final output dir
+        mergeArgs[2] = chukwaMainRepository + "/" + cluster + "/" + dataSource
+            + "/" + workingDay + "/" + workingHour;
+        // final output fileName
+        mergeArgs[3] = dataSource + "_" + workingDay + "_" + workingHour;
+        // delete rolling directory
+        mergeArgs[4] = rollingFolder + "/hourly/" + workingDay + "/"
+            + workingHour + "/" + cluster + "/" + dataSource;
+
+        log.info("HourlyChukwaRecordRolling 0: " + mergeArgs[0]);
+        log.info("HourlyChukwaRecordRolling 1: " + mergeArgs[1]);
+        log.info("HourlyChukwaRecordRolling 2: " + mergeArgs[2]);
+        log.info("HourlyChukwaRecordRolling 3: " + mergeArgs[3]);
+        log.info("HourlyChukwaRecordRolling 4: " + mergeArgs[4]);
+
+        RecordMerger merge = new RecordMerger(conf, fs,
+            new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata);
+        List<RecordMerger> allMerge = new ArrayList<RecordMerger>();
+        if (rollInSequence) {
+          merge.run();
+        } else {
+          allMerge.add(merge);
+          merge.start();
+        }
+
+        // join all Threads
+        if (!rollInSequence) {
+          while (allMerge.size() > 0) {
+            RecordMerger m = allMerge.remove(0);
+            try {
+              m.join();
+            } catch (InterruptedException e) {
+            }
+          }
+        } // End if (!rollInSequence)
+
+        // Delete the processed dataSourceFS
+        FileUtil.fullyDelete(fs, dataSourceFS.getPath());
+
+      } // End for(FileStatus dataSourceFS : dataSourcesFS)
+
+      // Delete the processed clusterFs
+      FileUtil.fullyDelete(fs, clusterFs.getPath());
+
+    } // End for(FileStatus clusterFs : clustersFS)
+
+    // Delete the processed hour
+    FileUtil.fullyDelete(fs, hourPath);
+  }
+
+  /**
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    conf = new ChukwaConfiguration();
+    String fsName = conf.get("writer.hdfs.filesystem");
+    fs = FileSystem.get(new URI(fsName), conf);
+
+    // TODO read from config
+    String rollingFolder = "/chukwa/rolling/";
+    String chukwaMainRepository = "/chukwa/repos/";
+    String tempDir = "/chukwa/temp/hourlyRolling/";
+
+    // TODO do a real parameter parsing
+    if (args.length != 4) {
+      usage();
+    }
+
+    if (!args[0].equalsIgnoreCase("rollInSequence")) {
+      usage();
+    }
+
+    if (!args[2].equalsIgnoreCase("deleteRawdata")) {
+      usage();
+    }
+
+    if (args[1].equalsIgnoreCase("true")) {
+      rollInSequence = true;
+    } else {
+      rollInSequence = false;
+    }
+
+    if (args[3].equalsIgnoreCase("true")) {
+      deleteRawdata = true;
+    } else {
+      deleteRawdata = false;
+    }
+
+    Calendar calendar = Calendar.getInstance();
+    int currentDay = Integer.parseInt(sdf.format(calendar.getTime()));
+    int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
+    log.info("CurrentDay: " + currentDay);
+    log.info("currentHour" + currentHour);
+
+    Path rootFolder = new Path(rollingFolder + "/hourly/");
+
+    FileStatus[] daysFS = fs.listStatus(rootFolder);
+    for (FileStatus dayFS : daysFS) {
+      try {
+        log.info("dayFs:" + dayFS.getPath().getName());
+        int workingDay = Integer.parseInt(dayFS.getPath().getName());
+
+        Path hourlySrc = new Path(rollingFolder + "/hourly/" + workingDay);
+        FileStatus[] hoursFS = fs.listStatus(hourlySrc);
+        for (FileStatus hourFS : hoursFS) {
+          String workinhHourStr = hourFS.getPath().getName();
+          int workingHour = Integer.parseInt(workinhHourStr);
+          if ((workingDay < currentDay) || // all previous days
+              ((workingDay == currentDay) && (workingHour < currentHour)) // Up
+                                                                          // to
+                                                                          // the
+                                                                          // last
+                                                                          // hour
+          ) {
+
+            buildHourlyFiles(chukwaMainRepository, tempDir, rollingFolder,
+                workingDay, workingHour);
+
+          } // End if ( (workingDay < currentDay) || ( (workingDay ==
+            // currentDay) && (intHour < currentHour) ) )
+        } // End for(FileStatus hourFS : hoursFS)
+      } // End Try workingDay =
+        // Integer.parseInt(sdf.format(dayFS.getPath().getName()));
+      catch (NumberFormatException e) { /* Not a standard Day directory skip */
+      }
+
+    } // for(FileStatus dayFS : daysFS)
+  }
+
+  public int run(String[] args) throws Exception {
+    JobConf conf = new JobConf(getConf(), HourlyChukwaRecordRolling.class);
+
+    conf.setJobName("HourlyChukwa-Rolling");
+    conf.setInputFormat(SequenceFileInputFormat.class);
+
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+
+    conf.setOutputKeyClass(ChukwaRecordKey.class);
+    conf.setOutputValueClass(ChukwaRecord.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+    conf.set("mapred.compress.map.output", "true");
+    conf.set("mapred.map.output.compression.codec",
+        "org.apache.hadoop.io.compress.LzoCodec");
+    conf.set("mapred.output.compress", "true");
+    conf.set("mapred.output.compression.type", "BLOCK");
+
+    log.info("HourlyChukwaRecordRolling input: " + args[0]);
+    log.info("HourlyChukwaRecordRolling output: " + args[1]);
+
+    FileInputFormat.setInputPaths(conf, args[0]);
+    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+    JobClient.runJob(conf);
+    return 0;
+  }
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/extraction/demux/MoveOrMergeRecordFile.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.chukwa.extraction.demux;
 
+
 import java.io.IOException;
 import java.net.URI;
-
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
@@ -41,198 +41,184 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-public class MoveOrMergeRecordFile extends Configured implements Tool
-{
-	static ChukwaConfiguration conf = null;
-	static FileSystem fs = null;
-	static final String HadoopLogDir = "_logs";
-	static final String hadoopTempDir = "_temporary";
-	
-	public int run(String[] args) throws Exception
-	{
-		JobConf conf = new JobConf(getConf(), MoveOrMergeRecordFile.class);
-
-		conf.setJobName("Chukwa-MoveOrMergeLogFile");
-		conf.setInputFormat(SequenceFileInputFormat.class);
-		
-		conf.setMapperClass(IdentityMapper.class);
-		conf.setReducerClass(IdentityReducer.class);
-		
-		//conf.setPartitionerClass(ChukwaPartitioner.class);
-		//conf.setOutputFormat(ChukwaOutputFormat.class);
-		
-		conf.setOutputKeyClass(ChukwaRecordKey.class);
-		conf.setOutputValueClass(ChukwaRecord.class);
-		conf.setOutputFormat(SequenceFileOutputFormat.class);
-		
-		FileInputFormat.setInputPaths(conf, args[0]);
-		FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
-		JobClient.runJob(conf);
-		return 0;
-	}
-
-	
-    static void moveOrMergeOneCluster(Path srcDir,String destDir) throws Exception
-	{
-		System.out.println("moveOrMergeOneCluster (" + srcDir.getName() + "," + destDir +")");
-		FileStatus fstat = fs.getFileStatus(srcDir);
-		
-		if (!fstat.isDir())
-		{
-			throw new IOException(srcDir + " is not a directory!");
-		}
-		else
-		{
-			FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
-			for(FileStatus datasourceDirectory : datasourceDirectories)
-			{
-				System.out.println(datasourceDirectory.getPath() + " isDir?" +datasourceDirectory.isDir());
-				if (!datasourceDirectory.isDir())
-				{
-					throw new IOException("Top level should just contains directories :" + datasourceDirectory.getPath());
-				}
-				
-				String dirName = datasourceDirectory.getPath().getName();
-				
-				Path destPath = new Path(destDir + "/" + dirName);
-				System.out.println("dest directory path: " + destPath);
-				
-				if (!fs.exists(destPath))
-				 {
-					System.out.println("create datasource directory [" + destDir + "/" + dirName + "]");
-					fs.mkdirs(destPath);
-				 }
-				
-				FileStatus[] evts = fs.listStatus(datasourceDirectory.getPath(),new EventFileFilter());
-				for(FileStatus eventFile : evts)	
-				{
-
-					Path eventFilePath = eventFile.getPath();
-					String filename = eventFilePath.getName();
-					System.out.println("src dir File: ["+  filename+"]");					
-					Path destFilePath = new Path(destDir + "/" + dirName + "/" + filename);
-					if (!fs.exists(destFilePath))
-					{
-						System.out.println("Moving File: [" + destFilePath +"]");
-						// Copy to final Location 
-						FileUtil.copy(fs,eventFilePath,fs,destFilePath,false,false,conf);
-					}
-					else
-					{
-						System.out.println("Need to merge! : [" + destFilePath +"]");
-						String strMrPath = datasourceDirectory.getPath().toString()+ "/" + "MR_" + System.currentTimeMillis();
-						Path mrPath = new Path(strMrPath);
-						System.out.println("\t New MR directory : [" + mrPath +"]");
-						// Create MR input Dir
-						fs.mkdirs(mrPath);
-						// Move Input files 
-						FileUtil.copy(fs,eventFilePath,fs,new Path(strMrPath+"/1.evt"),false,false,conf);
-						fs.rename(destFilePath, new Path(strMrPath+"/2.evt"));
-						
-						// Merge
-						String[] mergeArgs = new String[2];
-						mergeArgs[0] = strMrPath;
-						mergeArgs[1] = strMrPath + "/mrOutput";
-						DoMerge merge = new DoMerge(conf,fs,eventFilePath,destFilePath,mergeArgs);
-						merge.start();
-					}
-				}
-			}
-		}
-
-	}
-    
-	/**
-	 * @param args
-	 * @throws Exception 
-	 */
-	public static void main(String[] args) throws Exception
-	{
-		conf = new ChukwaConfiguration();
-		String fsName = conf.get("writer.hdfs.filesystem");
-		fs = FileSystem.get(new URI(fsName), conf);
-		
-		Path srcDir = new Path(args[0]);
-		String destDir = args[1];
-		
-		
-		FileStatus fstat = fs.getFileStatus(srcDir);
-		
-		if (!fstat.isDir())
-		{
-			throw new IOException(srcDir + " is not a directory!");
-		}
-		else
-		{
-			FileStatus[] clusters = fs.listStatus(srcDir);
-			// Run a moveOrMerge on all clusters
-			String name = null;
-			for(FileStatus cluster : clusters)
-			{
-				name = cluster.getPath().getName();
-				// Skip hadoop outDir
-				if ( (name.intern() == HadoopLogDir.intern() ) || (name.intern() == hadoopTempDir.intern()) )
-				{
-					continue;
-				}
-				moveOrMergeOneCluster(cluster.getPath(),destDir + "/" + cluster.getPath().getName());
-			}
-		}
-		System.out.println("Done with moveOrMerge main()");
-	}
+public class MoveOrMergeRecordFile extends Configured implements Tool {
+  static ChukwaConfiguration conf = null;
+  static FileSystem fs = null;
+  static final String HadoopLogDir = "_logs";
+  static final String hadoopTempDir = "_temporary";
+
+  public int run(String[] args) throws Exception {
+    JobConf conf = new JobConf(getConf(), MoveOrMergeRecordFile.class);
+
+    conf.setJobName("Chukwa-MoveOrMergeLogFile");
+    conf.setInputFormat(SequenceFileInputFormat.class);
+
+    conf.setMapperClass(IdentityMapper.class);
+    conf.setReducerClass(IdentityReducer.class);
+
+    // conf.setPartitionerClass(ChukwaPartitioner.class);
+    // conf.setOutputFormat(ChukwaOutputFormat.class);
+
+    conf.setOutputKeyClass(ChukwaRecordKey.class);
+    conf.setOutputValueClass(ChukwaRecord.class);
+    conf.setOutputFormat(SequenceFileOutputFormat.class);
+
+    FileInputFormat.setInputPaths(conf, args[0]);
+    FileOutputFormat.setOutputPath(conf, new Path(args[1]));
+
+    JobClient.runJob(conf);
+    return 0;
+  }
+
+  static void moveOrMergeOneCluster(Path srcDir, String destDir)
+      throws Exception {
+    System.out.println("moveOrMergeOneCluster (" + srcDir.getName() + ","
+        + destDir + ")");
+    FileStatus fstat = fs.getFileStatus(srcDir);
+
+    if (!fstat.isDir()) {
+      throw new IOException(srcDir + " is not a directory!");
+    } else {
+      FileStatus[] datasourceDirectories = fs.listStatus(srcDir);
+      for (FileStatus datasourceDirectory : datasourceDirectories) {
+        System.out.println(datasourceDirectory.getPath() + " isDir?"
+            + datasourceDirectory.isDir());
+        if (!datasourceDirectory.isDir()) {
+          throw new IOException("Top level should just contains directories :"
+              + datasourceDirectory.getPath());
+        }
+
+        String dirName = datasourceDirectory.getPath().getName();
+
+        Path destPath = new Path(destDir + "/" + dirName);
+        System.out.println("dest directory path: " + destPath);
+
+        if (!fs.exists(destPath)) {
+          System.out.println("create datasource directory [" + destDir + "/"
+              + dirName + "]");
+          fs.mkdirs(destPath);
+        }
+
+        FileStatus[] evts = fs.listStatus(datasourceDirectory.getPath(),
+            new EventFileFilter());
+        for (FileStatus eventFile : evts) {
+
+          Path eventFilePath = eventFile.getPath();
+          String filename = eventFilePath.getName();
+          System.out.println("src dir File: [" + filename + "]");
+          Path destFilePath = new Path(destDir + "/" + dirName + "/" + filename);
+          if (!fs.exists(destFilePath)) {
+            System.out.println("Moving File: [" + destFilePath + "]");
+            // Copy to final Location
+            FileUtil.copy(fs, eventFilePath, fs, destFilePath, false, false,
+                conf);
+          } else {
+            System.out.println("Need to merge! : [" + destFilePath + "]");
+            String strMrPath = datasourceDirectory.getPath().toString() + "/"
+                + "MR_" + System.currentTimeMillis();
+            Path mrPath = new Path(strMrPath);
+            System.out.println("\t New MR directory : [" + mrPath + "]");
+            // Create MR input Dir
+            fs.mkdirs(mrPath);
+            // Move Input files
+            FileUtil.copy(fs, eventFilePath, fs,
+                new Path(strMrPath + "/1.evt"), false, false, conf);
+            fs.rename(destFilePath, new Path(strMrPath + "/2.evt"));
+
+            // Merge
+            String[] mergeArgs = new String[2];
+            mergeArgs[0] = strMrPath;
+            mergeArgs[1] = strMrPath + "/mrOutput";
+            DoMerge merge = new DoMerge(conf, fs, eventFilePath, destFilePath,
+                mergeArgs);
+            merge.start();
+          }
+        }
+      }
+    }
+
+  }
+
+  /**
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    conf = new ChukwaConfiguration();
+    String fsName = conf.get("writer.hdfs.filesystem");
+    fs = FileSystem.get(new URI(fsName), conf);
+
+    Path srcDir = new Path(args[0]);
+    String destDir = args[1];
+
+    FileStatus fstat = fs.getFileStatus(srcDir);
+
+    if (!fstat.isDir()) {
+      throw new IOException(srcDir + " is not a directory!");
+    } else {
+      FileStatus[] clusters = fs.listStatus(srcDir);
+      // Run a moveOrMerge on all clusters
+      String name = null;
+      for (FileStatus cluster : clusters) {
+        name = cluster.getPath().getName();
+        // Skip hadoop outDir
+        if ((name.intern() == HadoopLogDir.intern())
+            || (name.intern() == hadoopTempDir.intern())) {
+          continue;
+        }
+        moveOrMergeOneCluster(cluster.getPath(), destDir + "/"
+            + cluster.getPath().getName());
+      }
+    }
+    System.out.println("Done with moveOrMerge main()");
+  }
 }
 
 
-class DoMerge extends Thread
-{
-	ChukwaConfiguration conf = null;
-	FileSystem fs = null;
-	String[] mergeArgs = new String[2];
-	Path destFilePath = null;
-	Path eventFilePath = null;
-	public DoMerge(ChukwaConfiguration conf,FileSystem fs,
-			Path eventFilePath,Path destFilePath,String[] mergeArgs)
-	{
-		this.conf = conf;
-		this.fs = fs;
-		this.eventFilePath = eventFilePath;
-		this.destFilePath = destFilePath;
-		this.mergeArgs = mergeArgs;
-	}
-	@Override
-	public void run()
-	{
-		System.out.println("\t Running Merge! : output [" + mergeArgs[1] +"]");
-		int res;
-		try
-		{
-			res = ToolRunner.run(new ChukwaConfiguration(),new MoveOrMergeRecordFile(), mergeArgs);
-			System.out.println("MR exit status: " + res);
-			if (res == 0)
-			{
-				System.out.println("\t Moving output file : to [" + destFilePath +"]");
-				FileUtil.copy(fs,new Path(mergeArgs[1]+"/part-00000"),fs,destFilePath,false,false,conf);
-				fs.rename(new Path(mergeArgs[1]+"/part-00000"), eventFilePath);
-			}
-			else
-			{
-				throw new RuntimeException("Error in M/R merge operation!");
-			}
-
-		} 
-		catch (Exception e)
-		{
-			e.printStackTrace();
-			throw new RuntimeException("Error in M/R merge operation!",e);
-		}
-	}
-	
+class DoMerge extends Thread {
+  ChukwaConfiguration conf = null;
+  FileSystem fs = null;
+  String[] mergeArgs = new String[2];
+  Path destFilePath = null;
+  Path eventFilePath = null;
+
+  public DoMerge(ChukwaConfiguration conf, FileSystem fs, Path eventFilePath,
+                 Path destFilePath, String[] mergeArgs) {
+    this.conf = conf;
+    this.fs = fs;
+    this.eventFilePath = eventFilePath;
+    this.destFilePath = destFilePath;
+    this.mergeArgs = mergeArgs;
+  }
+
+  @Override
+  public void run() {
+    System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]");
+    int res;
+    try {
+      res = ToolRunner.run(new ChukwaConfiguration(),
+          new MoveOrMergeRecordFile(), mergeArgs);
+      System.out.println("MR exit status: " + res);
+      if (res == 0) {
+        System.out.println("\t Moving output file : to [" + destFilePath + "]");
+        FileUtil.copy(fs, new Path(mergeArgs[1] + "/part-00000"), fs,
+            destFilePath, false, false, conf);
+        fs.rename(new Path(mergeArgs[1] + "/part-00000"), eventFilePath);
+      } else {
+        throw new RuntimeException("Error in M/R merge operation!");
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException("Error in M/R merge operation!", e);
+    }
+  }
+
 }
 
 
 class EventFileFilter implements PathFilter {
-	  public boolean accept(Path path) {
-	    return (path.toString().endsWith(".evt"));
-	  }
-	}
+  public boolean accept(Path path) {
+    return (path.toString().endsWith(".evt"));
+  }
+}