You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [13/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoo...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java Wed Mar 11 22:39:26 2009
@@ -5,10 +5,9 @@
  * License version 1.1, a copy of which has been included with this
  * distribution in the LICENSE.txt file.  */
 
-
-
 package org.apache.hadoop.chukwa.inputtools.log4j;
 
+
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -22,7 +21,6 @@
 import java.util.Locale;
 import java.util.TimeZone;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.util.RecordConstants;
 import org.apache.log4j.FileAppender;
@@ -135,57 +133,56 @@
     <b>DatePattern</b> option. The text before the colon is interpeted
     as the protocol specificaion of a URL which is probably not what
     you want. */
-    
+
 public class ChukwaDailyRollingFileAppender extends FileAppender {
 
-	static Logger log = Logger.getLogger(ChukwaDailyRollingFileAppender.class);
+  static Logger log = Logger.getLogger(ChukwaDailyRollingFileAppender.class);
   // The code assumes that the following constants are in a increasing
   // sequence.
-  static final int TOP_OF_TROUBLE=-1;
+  static final int TOP_OF_TROUBLE = -1;
   static final int TOP_OF_MINUTE = 0;
-  static final int TOP_OF_HOUR   = 1;
-  static final int HALF_DAY      = 2;
-  static final int TOP_OF_DAY    = 3;
-  static final int TOP_OF_WEEK   = 4;
-  static final int TOP_OF_MONTH  = 5;
+  static final int TOP_OF_HOUR = 1;
+  static final int HALF_DAY = 2;
+  static final int TOP_OF_DAY = 3;
+  static final int TOP_OF_WEEK = 4;
+  static final int TOP_OF_MONTH = 5;
 
   static final String adaptorType = ChukwaAgentController.CharFileTailUTF8NewLineEscaped;
 
   static final Object lock = new Object();
   static String lastRotation = "";
-  
+
   /**
-    The date pattern. By default, the pattern is set to
-    "'.'yyyy-MM-dd" meaning daily rollover.
+   * The date pattern. By default, the pattern is set to "'.'yyyy-MM-dd" meaning
+   * daily rollover.
    */
   private String datePattern = "'.'yyyy-MM-dd";
 
   /**
-      The log file will be renamed to the value of the
-      scheduledFilename variable when the next interval is entered. For
-      example, if the rollover period is one hour, the log file will be
-      renamed to the value of "scheduledFilename" at the beginning of
-      the next hour. 
-
-      The precise time when a rollover occurs depends on logging
-      activity. 
+   * The log file will be renamed to the value of the scheduledFilename variable
+   * when the next interval is entered. For example, if the rollover period is
+   * one hour, the log file will be renamed to the value of "scheduledFilename"
+   * at the beginning of the next hour.
+   * 
+   * The precise time when a rollover occurs depends on logging activity.
    */
   private String scheduledFilename;
 
   /**
-    The next time we estimate a rollover should occur. */
-  private long nextCheck = System.currentTimeMillis () - 1;
+   * The next time we estimate a rollover should occur.
+   */
+  private long nextCheck = System.currentTimeMillis() - 1;
 
   /**
    * Regex to select log files to be deleted
    */
   private String cleanUpRegex = null;
-  
+
   /**
    * Set the maximum number of backup files to keep around.
    */
   private int maxBackupIndex = 10;
-  
+
   Date now = new Date();
 
   SimpleDateFormat sdf;
@@ -197,45 +194,43 @@
   ChukwaAgentController chukwaClient;
   boolean chukwaClientIsNull = true;
   static final Object chukwaLock = new Object();
-  
+
   String chukwaClientHostname;
   int chukwaClientPortNum;
   long chukwaClientConnectNumRetry;
   long chukwaClientConnectRetryInterval;
 
   String recordType;
-  
-  
-  
-  
+
   // The gmtTimeZone is used only in computeCheckPeriod() method.
   static final TimeZone gmtTimeZone = TimeZone.getTimeZone("GMT");
 
-
   /**
-    The default constructor does nothing. */
-  public ChukwaDailyRollingFileAppender() throws IOException{
+   * The default constructor does nothing.
+   */
+  public ChukwaDailyRollingFileAppender() throws IOException {
     super();
   }
 
-  /**
+/**
      Instantiate a <code>DailyRollingFileAppender</code> and open the
      file designated by <code>filename</code>. The opened filename will
      become the output destination for this appender.
 
    */
-  public ChukwaDailyRollingFileAppender (Layout layout, String filename,
-      String datePattern) throws IOException {
+  public ChukwaDailyRollingFileAppender(Layout layout, String filename,
+                                        String datePattern) throws IOException {
     super(layout, filename, true);
-    System.out.println("Daily Rolling File Appender successfully registered file with agent: " + filename);
+    System.out
+        .println("Daily Rolling File Appender successfully registered file with agent: "
+            + filename);
     this.datePattern = datePattern;
     activateOptions();
   }
 
   /**
-    The <b>DatePattern</b> takes a string in the same format as
-    expected by {@link SimpleDateFormat}. This options determines the
-    rollover schedule.
+   * The <b>DatePattern</b> takes a string in the same format as expected by
+   * {@link SimpleDateFormat}. This options determines the rollover schedule.
    */
   public void setDatePattern(String pattern) {
     datePattern = pattern;
@@ -245,66 +240,64 @@
   public String getDatePattern() {
     return datePattern;
   }
-  
-  public String getRecordType(){
+
+  public String getRecordType() {
     if (recordType != null)
       return recordType;
     else
       return "unknown";
   }
-  
-  public void setRecordType(String recordType){
+
+  public void setRecordType(String recordType) {
     this.recordType = recordType;
   }
 
   public void activateOptions() {
     super.activateOptions();
-    if(datePattern != null && fileName != null) {
+    if (datePattern != null && fileName != null) {
       now.setTime(System.currentTimeMillis());
       sdf = new SimpleDateFormat(datePattern);
       int type = computeCheckPeriod();
       printPeriodicity(type);
       rc.setType(type);
       File file = new File(fileName);
-      scheduledFilename = fileName+sdf.format(new Date(file.lastModified()));
+      scheduledFilename = fileName + sdf.format(new Date(file.lastModified()));
 
     } else {
-      LogLog.error("Either File or DatePattern options are not set for appender ["
-          +name+"].");
+      LogLog
+          .error("Either File or DatePattern options are not set for appender ["
+              + name + "].");
     }
   }
 
   void printPeriodicity(int type) {
-    switch(type) {
+    switch (type) {
     case TOP_OF_MINUTE:
-      LogLog.debug("Appender ["+name+"] to be rolled every minute.");
+      LogLog.debug("Appender [" + name + "] to be rolled every minute.");
       break;
     case TOP_OF_HOUR:
-      LogLog.debug("Appender ["+name
-          +"] to be rolled on top of every hour.");
+      LogLog
+          .debug("Appender [" + name + "] to be rolled on top of every hour.");
       break;
     case HALF_DAY:
-      LogLog.debug("Appender ["+name
-          +"] to be rolled at midday and midnight.");
+      LogLog.debug("Appender [" + name
+          + "] to be rolled at midday and midnight.");
       break;
     case TOP_OF_DAY:
-      LogLog.debug("Appender ["+name
-          +"] to be rolled at midnight.");
+      LogLog.debug("Appender [" + name + "] to be rolled at midnight.");
       break;
     case TOP_OF_WEEK:
-      LogLog.debug("Appender ["+name
-          +"] to be rolled at start of week.");
+      LogLog.debug("Appender [" + name + "] to be rolled at start of week.");
       break;
     case TOP_OF_MONTH:
-      LogLog.debug("Appender ["+name
-          +"] to be rolled at start of every month.");
+      LogLog.debug("Appender [" + name
+          + "] to be rolled at start of every month.");
       break;
     default:
-      LogLog.warn("Unknown periodicity for appender ["+name+"].");
+      LogLog.warn("Unknown periodicity for appender [" + name + "].");
     }
   }
 
-
   // This method computes the roll over period by looping over the
   // periods, starting with the shortest, and stopping when the r0 is
   // different from from r1, where r0 is the epoch formatted according
@@ -315,19 +308,21 @@
   // GMT (the epoch).
 
   int computeCheckPeriod() {
-    RollingCalendar rollingCalendar = new RollingCalendar(gmtTimeZone, Locale.ENGLISH);
+    RollingCalendar rollingCalendar = new RollingCalendar(gmtTimeZone,
+        Locale.ENGLISH);
     // set sate to 1970-01-01 00:00:00 GMT
     Date epoch = new Date(0);
-    if(datePattern != null) {
-      for(int i = TOP_OF_MINUTE; i <= TOP_OF_MONTH; i++) {
+    if (datePattern != null) {
+      for (int i = TOP_OF_MINUTE; i <= TOP_OF_MONTH; i++) {
         SimpleDateFormat simpleDateFormat = new SimpleDateFormat(datePattern);
-        simpleDateFormat.setTimeZone(gmtTimeZone); // do all date formatting in GMT
+        simpleDateFormat.setTimeZone(gmtTimeZone); // do all date formatting in
+                                                   // GMT
         String r0 = simpleDateFormat.format(epoch);
         rollingCalendar.setType(i);
         Date next = new Date(rollingCalendar.getNextCheckMillis(epoch));
-        String r1 =  simpleDateFormat.format(next);
-        //System.out.println("Type = "+i+", r0 = "+r0+", r1 = "+r1);
-        if(r0 != null && r1 != null && !r0.equals(r1)) {
+        String r1 = simpleDateFormat.format(next);
+        // System.out.println("Type = "+i+", r0 = "+r0+", r1 = "+r1);
+        if (r0 != null && r1 != null && !r0.equals(r1)) {
           return i;
         }
       }
@@ -336,7 +331,7 @@
   }
 
   /**
-    Rollover the current file to a new file.
+   * Rollover the current file to a new file.
    */
   void rollOver() throws IOException {
 
@@ -346,7 +341,7 @@
       return;
     }
 
-    String datedFilename = fileName+sdf.format(now);
+    String datedFilename = fileName + sdf.format(now);
     // It is too early to roll over because we are still within the
     // bounds of the current interval. Rollover will occur once the
     // next interval is reached.
@@ -354,38 +349,35 @@
       return;
     }
 
-	
     // close current file, and rename it to datedFilename
     this.closeFile();
 
-
-    File target  = new File(scheduledFilename);
+    File target = new File(scheduledFilename);
     if (target.exists()) {
       target.delete();
     }
 
     File file = new File(fileName);
-    
+
     boolean result = file.renameTo(target);
-    if(result) {
-      LogLog.debug(fileName +" -> "+ scheduledFilename);
+    if (result) {
+      LogLog.debug(fileName + " -> " + scheduledFilename);
     } else {
-      LogLog.error("Failed to rename ["+fileName+"] to ["+scheduledFilename+"].");
+      LogLog.error("Failed to rename [" + fileName + "] to ["
+          + scheduledFilename + "].");
     }
 
     try {
       // This will also close the file. This is OK since multiple
       // close operations are safe.
       this.setFile(fileName, false, this.bufferedIO, this.bufferSize);
+    } catch (IOException e) {
+      errorHandler.error("setFile(" + fileName + ", false) call failed.");
     }
-    catch(IOException e) {
-      errorHandler.error("setFile("+fileName+", false) call failed.");
-    }    
     scheduledFilename = datedFilename;
     cleanUp();
   }
 
-  
   public String getCleanUpRegex() {
     return cleanUpRegex;
   }
@@ -406,194 +398,194 @@
     String regex = "";
     try {
       File actualFile = new File(fileName);
-      
+
       String directoryName = actualFile.getParent();
       String actualFileName = actualFile.getName();
       File dirList = new File(directoryName);
-      
-      
+
       if (cleanUpRegex == null || !cleanUpRegex.contains("$fileName")) {
-        LogLog.error("cleanUpRegex == null || !cleanUpRegex.contains(\"$fileName\")");
+        LogLog
+            .error("cleanUpRegex == null || !cleanUpRegex.contains(\"$fileName\")");
         return;
       }
-      regex =cleanUpRegex.replace("$fileName", actualFileName);
-      String[] dirFiles = dirList.list(new LogFilter(actualFileName,regex));
-      
+      regex = cleanUpRegex.replace("$fileName", actualFileName);
+      String[] dirFiles = dirList.list(new LogFilter(actualFileName, regex));
+
       List<String> files = new ArrayList<String>();
-      for(String file: dirFiles) {
-        files.add(file); 
+      for (String file : dirFiles) {
+        files.add(file);
       }
       Collections.sort(files);
-      
-      while(files.size() > maxBackupIndex) {
+
+      while (files.size() > maxBackupIndex) {
         String file = files.remove(0);
-        File f = new File(directoryName + "/" +file);
+        File f = new File(directoryName + "/" + file);
         f.delete();
-        LogLog.debug("Removing: " +file);
+        LogLog.debug("Removing: " + file);
       }
-    } catch(Exception e) {
-      errorHandler.error("cleanUp("+fileName+"," + regex +") call failed.");
+    } catch (Exception e) {
+      errorHandler
+          .error("cleanUp(" + fileName + "," + regex + ") call failed.");
     }
   }
-  
+
   private class LogFilter implements FilenameFilter {
     private Pattern p = null;
     private String logFile = null;
- 
-    public LogFilter(String logFile,String regex) {
+
+    public LogFilter(String logFile, String regex) {
       this.logFile = logFile;
-      p = Pattern.compile(regex); 
+      p = Pattern.compile(regex);
     }
- 
+
     @Override
     public boolean accept(File dir, String name) {
       // ignore current log file
-      if (name.intern() == this.logFile.intern() ) {
+      if (name.intern() == this.logFile.intern()) {
         return false;
       }
-      //ignore file without the same prefix
+      // ignore file without the same prefix
       if (!name.startsWith(logFile)) {
         return false;
       }
       return p.matcher(name).find();
     }
   }
-  
-  private class ClientFinalizer extends Thread 
-  {
-	  private ChukwaAgentController chukwaClient = null;
-	  private String recordType = null;
-	  private String fileName = null;
-	  public ClientFinalizer(ChukwaAgentController chukwaClient,String recordType, String fileName)
-	  {
-		  this.chukwaClient = chukwaClient;
-		  this.recordType = recordType;
-		  this.fileName = fileName;
-	  }
-	    public synchronized void run() 
-	    {
-	      try 
-	      {
-	    	  if (chukwaClient != null)
-	    	  {
-	    		  log.debug("ShutdownHook: removing:" + fileName);
-	    		  chukwaClient.removeFile(recordType, fileName);
-	    	  }
-	    	  else
-	    	  {
-	    		  LogLog.warn("chukwaClient is null cannot do any cleanup");
-	    	  }
-	      } 
-	      catch (Throwable e) 
-	      {
-	    	  LogLog.warn("closing the controller threw an exception:\n" + e);
-	      }
-	    }
-	  }
-	  private ClientFinalizer clientFinalizer = null;
-  
+
+  private class ClientFinalizer extends Thread {
+    private ChukwaAgentController chukwaClient = null;
+    private String recordType = null;
+    private String fileName = null;
+
+    public ClientFinalizer(ChukwaAgentController chukwaClient,
+                           String recordType, String fileName) {
+      this.chukwaClient = chukwaClient;
+      this.recordType = recordType;
+      this.fileName = fileName;
+    }
+
+    public synchronized void run() {
+      try {
+        if (chukwaClient != null) {
+          log.debug("ShutdownHook: removing:" + fileName);
+          chukwaClient.removeFile(recordType, fileName);
+        } else {
+          LogLog.warn("chukwaClient is null cannot do any cleanup");
+        }
+      } catch (Throwable e) {
+        LogLog.warn("closing the controller threw an exception:\n" + e);
+      }
+    }
+  }
+
+  private ClientFinalizer clientFinalizer = null;
+
   /**
-   * This method differentiates DailyRollingFileAppender from its
-   * super class.
-   *
-   * <p>Before actually logging, this method will check whether it is
-   * time to do a rollover. If it is, it will schedule the next
-   * rollover time and then rollover.
+   * This method differentiates DailyRollingFileAppender from its super class.
+   * 
+   * <p>Before actually logging, this method will check whether it is time to do
+   * a rollover. If it is, it will schedule the next rollover time and then
+   * rollover.
    * */
-  protected void subAppend(LoggingEvent event) 
-  {
-	  try
-	  {
-		  //we set up the chukwa adaptor here because this is the first
-		  //point which is called after all setters have been called with
-		  //their values from the log4j.properties file, in particular we
-		  //needed to give setCukwaClientPortNum() and -Hostname() a shot
-		  
-		  // Make sure only one thread can do this
-		  // and use the boolean to avoid the first level locking
-		  if (chukwaClientIsNull)
-		  {
-			  synchronized(chukwaLock)
-			  {
-				  if (chukwaClient == null){
-					  if (getChukwaClientHostname() != null && getChukwaClientPortNum() != 0){
-						  chukwaClient = new ChukwaAgentController(getChukwaClientHostname(), getChukwaClientPortNum());
-						  log.debug("setup adaptor with hostname " + getChukwaClientHostname() + " and portnum " + getChukwaClientPortNum());
-					  }
-					  else{
-						  chukwaClient = new ChukwaAgentController();
-						  log.debug("setup adaptor with no args, which means it used its defaults");
-					  }
-
-					  chukwaClientIsNull = false;
-					  
-					  // Watchdog is watching for ChukwaAgent only once every 5 minutes, so there's no point in retrying more than once every 5 mins.
-					  // In practice, if the watchdog is not able to automatically restart the agent, it will take more than 20 minutes to get Ops to restart it.
-					  // Also its a good  to limit the number of communications between Hadoop and Chukwa, that's why 30 minutes.
-					  long retryInterval = chukwaClientConnectRetryInterval;
-					  if (retryInterval == 0) {
-					    retryInterval = 1000 * 60 * 30;
-					  }
-					  long numRetries = chukwaClientConnectNumRetry;
-					  if (numRetries == 0) {
-					    numRetries = 48;
-					  }
-					  String log4jFileName = getFile();
-					  String recordType = getRecordType();
-					  long adaptorID = chukwaClient.addFile(recordType, log4jFileName, numRetries, retryInterval);
-
-					  // Setup a shutdownHook for the controller
-					  clientFinalizer = new ClientFinalizer(chukwaClient,recordType,log4jFileName);
-					  Runtime.getRuntime().addShutdownHook(clientFinalizer);
-
-					  
-					  if (adaptorID > 0){
-						  log.debug("Added file tailing adaptor to chukwa agent for file " + log4jFileName + "using this recordType :" + recordType);
-					  }
-					  else{
-						  log.debug("Chukwa adaptor not added, addFile(" + log4jFileName + ") returned " + adaptorID);
-					  }
-					  
-				  }				  
-			  }
-		  }
-		  
-
-		  long n = System.currentTimeMillis();
-		  if (n >= nextCheck) {
-			  now.setTime(n);
-			  nextCheck = rc.getNextCheckMillis(now);
-			  try {
-				  rollOver();
-			  }
-			  catch(IOException ioe) {
-				  LogLog.error("rollOver() failed.", ioe);
-			  }
-		  }
-		  //escape the newlines from record bodies and then write this record to the log file
-		  this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",this.layout.format(event)));
-
-		  if(layout.ignoresThrowable()) {
-			  String[] s = event.getThrowableStrRep();
-			  if (s != null) {
-				  int len = s.length;
-				  for(int i = 0; i < len; i++) {
-					  this.qw.write(s[i]);
-					  this.qw.write(Layout.LINE_SEP);
-				  }
-			  }
-		  }
-
-		  if(this.immediateFlush) {
-			  this.qw.flush();
-		  }		  
-	  }
-	  catch(Throwable e)
-	  {
-		  System.err.println("Exception in ChukwaRollingAppender: " + e.getMessage());
-		  e.printStackTrace();
-	  }
-    
+  protected void subAppend(LoggingEvent event) {
+    try {
+      // we set up the chukwa adaptor here because this is the first
+      // point which is called after all setters have been called with
+      // their values from the log4j.properties file, in particular we
+      // needed to give setCukwaClientPortNum() and -Hostname() a shot
+
+      // Make sure only one thread can do this
+      // and use the boolean to avoid the first level locking
+      if (chukwaClientIsNull) {
+        synchronized (chukwaLock) {
+          if (chukwaClient == null) {
+            if (getChukwaClientHostname() != null
+                && getChukwaClientPortNum() != 0) {
+              chukwaClient = new ChukwaAgentController(
+                  getChukwaClientHostname(), getChukwaClientPortNum());
+              log.debug("setup adaptor with hostname "
+                  + getChukwaClientHostname() + " and portnum "
+                  + getChukwaClientPortNum());
+            } else {
+              chukwaClient = new ChukwaAgentController();
+              log
+                  .debug("setup adaptor with no args, which means it used its defaults");
+            }
+
+            chukwaClientIsNull = false;
+
+            // Watchdog is watching for ChukwaAgent only once every 5 minutes,
+            // so there's no point in retrying more than once every 5 mins.
+            // In practice, if the watchdog is not able to automatically restart
+            // the agent, it will take more than 20 minutes to get Ops to
+            // restart it.
+            // Also its a good to limit the number of communications between
+            // Hadoop and Chukwa, that's why 30 minutes.
+            long retryInterval = chukwaClientConnectRetryInterval;
+            if (retryInterval == 0) {
+              retryInterval = 1000 * 60 * 30;
+            }
+            long numRetries = chukwaClientConnectNumRetry;
+            if (numRetries == 0) {
+              numRetries = 48;
+            }
+            String log4jFileName = getFile();
+            String recordType = getRecordType();
+            long adaptorID = chukwaClient.addFile(recordType, log4jFileName,
+                numRetries, retryInterval);
+
+            // Setup a shutdownHook for the controller
+            clientFinalizer = new ClientFinalizer(chukwaClient, recordType,
+                log4jFileName);
+            Runtime.getRuntime().addShutdownHook(clientFinalizer);
+
+            if (adaptorID > 0) {
+              log.debug("Added file tailing adaptor to chukwa agent for file "
+                  + log4jFileName + "using this recordType :" + recordType);
+            } else {
+              log.debug("Chukwa adaptor not added, addFile(" + log4jFileName
+                  + ") returned " + adaptorID);
+            }
+
+          }
+        }
+      }
+
+      long n = System.currentTimeMillis();
+      if (n >= nextCheck) {
+        now.setTime(n);
+        nextCheck = rc.getNextCheckMillis(now);
+        try {
+          rollOver();
+        } catch (IOException ioe) {
+          LogLog.error("rollOver() failed.", ioe);
+        }
+      }
+      // escape the newlines from record bodies and then write this record to
+      // the log file
+      this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",
+          this.layout.format(event)));
+
+      if (layout.ignoresThrowable()) {
+        String[] s = event.getThrowableStrRep();
+        if (s != null) {
+          int len = s.length;
+          for (int i = 0; i < len; i++) {
+            this.qw.write(s[i]);
+            this.qw.write(Layout.LINE_SEP);
+          }
+        }
+      }
+
+      if (this.immediateFlush) {
+        this.qw.flush();
+      }
+    } catch (Throwable e) {
+      System.err.println("Exception in ChukwaRollingAppender: "
+          + e.getMessage());
+      e.printStackTrace();
+    }
+
   }
 
   public String getChukwaClientHostname() {
@@ -612,38 +604,37 @@
     this.chukwaClientPortNum = chukwaClientPortNum;
   }
 
-  public void setChukwaClientConnectNumRetry(int i){
+  public void setChukwaClientConnectNumRetry(int i) {
     this.chukwaClientConnectNumRetry = i;
   }
-  
+
   public void setChukwaClientConnectRetryInterval(long i) {
     this.chukwaClientConnectRetryInterval = i;
   }
-  
-}
 
+}
 
 
 /**
- *  RollingCalendar is a helper class to DailyRollingFileAppender.
- *  Given a periodicity type and the current time, it computes the
- *  start of the next interval.  
+ * RollingCalendar is a helper class to DailyRollingFileAppender. Given a
+ * periodicity type and the current time, it computes the start of the next
+ * interval.
  * */
 class RollingCalendar extends GregorianCalendar {
 
   /**
 	 * 
 	 */
-	private static final long serialVersionUID = 2153481574198792767L;
-int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
+  private static final long serialVersionUID = 2153481574198792767L;
+  int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
 
   RollingCalendar() {
     super();
-  }  
+  }
 
   RollingCalendar(TimeZone tz, Locale locale) {
     super(tz, locale);
-  }  
+  }
 
   void setType(int type) {
     this.type = type;
@@ -656,7 +647,7 @@
   public Date getNextCheckDate(Date now) {
     this.setTime(now);
 
-    switch(type) {
+    switch (type) {
     case ChukwaDailyRollingFileAppender.TOP_OF_MINUTE:
       this.set(Calendar.SECOND, 0);
       this.set(Calendar.MILLISECOND, 0);
@@ -673,7 +664,7 @@
       this.set(Calendar.SECOND, 0);
       this.set(Calendar.MILLISECOND, 0);
       int hour = get(Calendar.HOUR_OF_DAY);
-      if(hour < 12) {
+      if (hour < 12) {
         this.set(Calendar.HOUR_OF_DAY, 12);
       } else {
         this.set(Calendar.HOUR_OF_DAY, 0);
@@ -707,4 +698,3 @@
     return getTime();
   }
 }
-

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java Wed Mar 11 22:39:26 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.chukwa.inputtools.log4j;
 
+
 import java.io.*;
 import java.util.Enumeration;
 import java.util.logging.LogManager;
@@ -34,28 +35,27 @@
 
 public class Log4JMetricsContext extends AbstractMetricsContext {
 
-  Logger out = null; //Logger.getLogger(Log4JMetricsContext.class);
+  Logger out = null; // Logger.getLogger(Log4JMetricsContext.class);
   static final Object lock = new Object();
-  
+
   /* Configuration attribute names */
-//  protected static final String FILE_NAME_PROPERTY = "fileName";
+  // protected static final String FILE_NAME_PROPERTY = "fileName";
   protected static final String PERIOD_PROPERTY = "period";
-  private static final String metricsLogDir = System.getProperty("hadoop.log.dir");
+  private static final String metricsLogDir = System
+      .getProperty("hadoop.log.dir");
   private static final String user = System.getProperty("user.name");
 
-    
   /** Creates a new instance of FileContext */
-  public Log4JMetricsContext() {}
-     
+  public Log4JMetricsContext() {
+  }
+
   public void init(String contextName, ContextFactory factory) {
     super.init(contextName, factory);
-  /*      
-    String fileName = getAttribute(FILE_NAME_PROPERTY);
-    if (fileName != null) {
-      file = new File(fileName);
-    }
-    */
-    
+    /*
+     * String fileName = getAttribute(FILE_NAME_PROPERTY); if (fileName != null)
+     * { file = new File(fileName); }
+     */
+
     String periodStr = getAttribute(PERIOD_PROPERTY);
     if (periodStr != null) {
       int period = 0;
@@ -69,63 +69,82 @@
       setPeriod(period);
     }
   }
-  
+
   @Override
-  protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
-      throws IOException {
-	  if (out == null) {
-		  synchronized(lock) {
-			  if (out == null) {
-				  java.util.Properties properties = new java.util.Properties();
-				  properties.load(this.getClass().getClassLoader().getResourceAsStream("chukwa-hadoop-metrics-log4j.properties"));
-				  Logger logger = Logger.getLogger(Log4JMetricsContext.class);
-                  logger.setAdditivity(false);
-				  PatternLayout layout = new PatternLayout(properties.getProperty("log4j.appender.chukwa."+contextName+".layout.ConversionPattern"));
-				      org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender appender =
-				        new org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender();
-				  appender.setName("chukwa."+contextName);
-				  appender.setLayout(layout);
-				  appender.setAppend(true);
-				  if(properties.getProperty("log4j.appender.chukwa."+contextName+".Dir")!=null) {
-				    String logName = properties.getProperty("log4j.appender.chukwa."+contextName+".Dir")
-				    +File.separator+"chukwa-"+user+"-"+contextName + "-" + System.currentTimeMillis() +".log";
-
-					  // FIXME: Hack to make the log file readable by chukwa user. 
-					  if(System.getProperty("os.name").intern()=="Linux".intern()) {
-						  Runtime.getRuntime().exec("chmod 640 "+logName);
-					  }
-				      appender.setFile(logName);					  
-				  } else {
-				    appender.setFile(metricsLogDir+File.separator+"chukwa-"+user+"-"
-				        +contextName + "-" + System.currentTimeMillis()+ ".log");
-				  }
-				  appender.activateOptions();
-				  appender.setRecordType(properties.getProperty("log4j.appender.chukwa."+contextName+".recordType"));
-				  appender.setChukwaClientHostname(properties.getProperty("log4j.appender.chukwa."+contextName+".chukwaClientHostname"));
-				  appender.setChukwaClientPortNum(Integer.parseInt(properties.getProperty("log4j.appender.chukwa."+contextName+".chukwaClientPortNum")));
-				  appender.setDatePattern(properties.getProperty("log4j.appender.chukwa."+contextName+".DatePattern"));
-				  logger.addAppender(appender);
-				  out = logger;
-			  }
-		  }
-	  }
-	  
-	  
-	JSONObject json = new JSONObject();
+  protected void emitRecord(String contextName, String recordName,
+      OutputRecord outRec) throws IOException {
+    if (out == null) {
+      synchronized (lock) {
+        if (out == null) {
+          java.util.Properties properties = new java.util.Properties();
+          properties.load(this.getClass().getClassLoader().getResourceAsStream(
+              "chukwa-hadoop-metrics-log4j.properties"));
+          Logger logger = Logger.getLogger(Log4JMetricsContext.class);
+          logger.setAdditivity(false);
+          PatternLayout layout = new PatternLayout(properties
+              .getProperty("log4j.appender.chukwa." + contextName
+                  + ".layout.ConversionPattern"));
+          org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender appender = new org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender();
+          appender.setName("chukwa." + contextName);
+          appender.setLayout(layout);
+          appender.setAppend(true);
+          if (properties.getProperty("log4j.appender.chukwa." + contextName
+              + ".Dir") != null) {
+            String logName = properties.getProperty("log4j.appender.chukwa."
+                + contextName + ".Dir")
+                + File.separator
+                + "chukwa-"
+                + user
+                + "-"
+                + contextName
+                + "-"
+                + System.currentTimeMillis() + ".log";
+
+            // FIXME: Hack to make the log file readable by chukwa user.
+            if (System.getProperty("os.name").intern() == "Linux".intern()) {
+              Runtime.getRuntime().exec("chmod 640 " + logName);
+            }
+            appender.setFile(logName);
+          } else {
+            appender
+                .setFile(metricsLogDir + File.separator + "chukwa-" + user
+                    + "-" + contextName + "-" + System.currentTimeMillis()
+                    + ".log");
+          }
+          appender.activateOptions();
+          appender.setRecordType(properties
+              .getProperty("log4j.appender.chukwa." + contextName
+                  + ".recordType"));
+          appender.setChukwaClientHostname(properties
+              .getProperty("log4j.appender.chukwa." + contextName
+                  + ".chukwaClientHostname"));
+          appender.setChukwaClientPortNum(Integer.parseInt(properties
+              .getProperty("log4j.appender.chukwa." + contextName
+                  + ".chukwaClientPortNum")));
+          appender.setDatePattern(properties
+              .getProperty("log4j.appender.chukwa." + contextName
+                  + ".DatePattern"));
+          logger.addAppender(appender);
+          out = logger;
+        }
+      }
+    }
+
+    JSONObject json = new JSONObject();
     try {
-		json.put("contextName", contextName);
-		json.put("recordName", recordName);
-		json.put("chukwa_timestamp", System.currentTimeMillis());
-	    for (String tagName : outRec.getTagNames()) {
-            json.put(tagName, outRec.getTag(tagName));
-	    }
-	    for (String metricName : outRec.getMetricNames()) {
-	    	json.put(metricName, outRec.getMetric(metricName));
-	    }
+      json.put("contextName", contextName);
+      json.put("recordName", recordName);
+      json.put("chukwa_timestamp", System.currentTimeMillis());
+      for (String tagName : outRec.getTagNames()) {
+        json.put(tagName, outRec.getTag(tagName));
+      }
+      for (String metricName : outRec.getMetricNames()) {
+        json.put(metricName, outRec.getMetric(metricName));
+      }
     } catch (JSONException e) {
-		// TODO Auto-generated catch block
-		e.printStackTrace();
-	}    
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
     out.info(json.toString());
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java Wed Mar 11 22:39:26 2009
@@ -18,21 +18,21 @@
 
 package org.apache.hadoop.chukwa.inputtools.log4j;
 
+
 import org.apache.log4j.*;
 import org.apache.log4j.spi.LoggingEvent;
 
 public class OneLineLogLayout extends PatternLayout {
-  
+
   char SEP = ' ';
-  public String format(LoggingEvent evt)
-  {
-    
+
+  public String format(LoggingEvent evt) {
+
     String initial_s = super.format(evt);
     StringBuilder sb = new StringBuilder();
-    for(int i = 0; i < initial_s.length() -1 ; ++i)
-    {
+    for (int i = 0; i < initial_s.length() - 1; ++i) {
       char c = initial_s.charAt(i);
-      if(c == '\n')
+      if (c == '\n')
         sb.append(SEP);
       else
         sb.append(c);
@@ -41,23 +41,21 @@
     String[] s = evt.getThrowableStrRep();
     if (s != null) {
       int len = s.length;
-      for(int i = 0; i < len; i++) {
+      for (int i = 0; i < len; i++) {
         sb.append(s[i]);
         sb.append(SEP);
-        }
+      }
     }
-    
+
     sb.append('\n');
     return sb.toString();
   }
-  
-  public boolean ignoresThrowable()
-  {
+
+  public boolean ignoresThrowable() {
     return false;
   }
-  
-  public static void main(String[] args)
-  {
+
+  public static void main(String[] args) {
     System.setProperty("line.separator", " ");
     Logger l = Logger.getRootLogger();
     l.removeAllAppenders();
@@ -65,8 +63,7 @@
     appender.setName("console");
     l.addAppender(appender);
     l.warn("testing", new java.io.IOException("just kidding!"));
-    
-    
+
   }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.inputtools.mdl;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,51 +28,55 @@
 import java.util.Map;
 
 public class DataConfig {
-    private static Configuration config;
-    final static String DATACONFIG = "mdl.xml";
-    private Log log = LogFactory.getLog(DataConfig.class);
-    
-    public DataConfig(String path) {
-        Path fileResource = new Path(path);
+  private static Configuration config;
+  final static String DATACONFIG = "mdl.xml";
+  private Log log = LogFactory.getLog(DataConfig.class);
+
+  public DataConfig(String path) {
+    Path fileResource = new Path(path);
+    config = new Configuration();
+    config.addResource(fileResource);
+  }
+
+  public DataConfig() {
+    String dataConfig = System.getenv("DATACONFIG");
+    if (dataConfig == null) {
+      dataConfig = DATACONFIG;
+    }
+    log.debug("DATACONFIG=" + dataConfig);
+    if (config == null) {
+      try {
+        Path fileResource = new Path(dataConfig);
         config = new Configuration();
         config.addResource(fileResource);
+      } catch (Exception e) {
+        log.debug("Error reading configuration file:" + dataConfig);
+      }
     }
-    public DataConfig() {
-    	String dataConfig = System.getenv("DATACONFIG");
-    	if(dataConfig==null) {
-    		dataConfig=DATACONFIG;
-    	}
-    	log.debug("DATACONFIG="+dataConfig);
-    	if(config==null)  {
-    		try {
-                Path fileResource = new Path(dataConfig);
-                config = new Configuration();
-                config.addResource(fileResource);
-    		} catch (Exception e) {
-    			log.debug("Error reading configuration file:"+dataConfig);
-    		}
-    	}
-    }
+  }
 
-    public String get(String key) {
-        return config.get(key);
-    }
-    public void put(String key, String value) {
-        config.set(key, value);
-    }
-    public Iterator<Map.Entry <String, String>> iterator() {
-        return config.iterator();
-    }
-    public HashMap<String, String> startWith(String key) {
-        HashMap<String, String> transformer = new HashMap<String, String>();
-        Iterator<Map.Entry <String, String>> entries = config.iterator();
-        while(entries.hasNext()) {
-           String entry = entries.next().toString();
-           if(entry.startsWith(key)) {
-               String[] metrics = entry.split("=");
-               transformer.put(metrics[0],metrics[1]);
-           }
-        }
-        return transformer;
+  public String get(String key) {
+    return config.get(key);
+  }
+
+  public void put(String key, String value) {
+    config.set(key, value);
+  }
+
+  public Iterator<Map.Entry<String, String>> iterator() {
+    return config.iterator();
+  }
+
+  public HashMap<String, String> startWith(String key) {
+    HashMap<String, String> transformer = new HashMap<String, String>();
+    Iterator<Map.Entry<String, String>> entries = config.iterator();
+    while (entries.hasNext()) {
+      String entry = entries.next().toString();
+      if (entry.startsWith(key)) {
+        String[] metrics = entry.split("=");
+        transformer.put(metrics[0], metrics[1]);
+      }
     }
+    return transformer;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java Wed Mar 11 22:39:26 2009
@@ -17,48 +17,47 @@
  */
 package org.apache.hadoop.chukwa.inputtools.mdl;
 
+
 import java.lang.Thread;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.BufferedReader;
 import java.lang.StringBuffer;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class ErStreamHandler extends Thread{
-	InputStream inpStr;
-	String command;
-	boolean record;
-	
-    private static Log log = LogFactory.getLog(ErStreamHandler.class);	
-    
-	public ErStreamHandler(InputStream inpStr,String command,boolean record){
-		this.inpStr=inpStr;
-		this.command=command;
-		this.record=record;
-
-	}
-
-	public void run(){
-		try {
-			InputStreamReader inpStrd=new InputStreamReader(inpStr);
-			BufferedReader buffRd=new BufferedReader(inpStrd);
-			String line=null;
-			StringBuffer sb=new StringBuffer();
-			while((line=buffRd.readLine())!= null){
-                 sb.append(line);			
-			}
-			buffRd.close();
-			
-			if (record && sb.length()>0) {
-				log.error(command+" execution error:"+sb.toString());				
-			}
-			
-		}catch (Exception e){
-			log.error(command+" error:"+e.getMessage());
-		}
-	}
-	
-	
+public class ErStreamHandler extends Thread {
+  InputStream inpStr;
+  String command;
+  boolean record;
+
+  private static Log log = LogFactory.getLog(ErStreamHandler.class);
+
+  public ErStreamHandler(InputStream inpStr, String command, boolean record) {
+    this.inpStr = inpStr;
+    this.command = command;
+    this.record = record;
+
+  }
+
+  public void run() {
+    try {
+      InputStreamReader inpStrd = new InputStreamReader(inpStr);
+      BufferedReader buffRd = new BufferedReader(inpStrd);
+      String line = null;
+      StringBuffer sb = new StringBuffer();
+      while ((line = buffRd.readLine()) != null) {
+        sb.append(line);
+      }
+      buffRd.close();
+
+      if (record && sb.length() > 0) {
+        log.error(command + " execution error:" + sb.toString());
+      }
+
+    } catch (Exception e) {
+      log.error(command + " error:" + e.getMessage());
+    }
+  }
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java Wed Mar 11 22:39:26 2009
@@ -18,80 +18,81 @@
 
 package org.apache.hadoop.chukwa.inputtools.mdl;
 
+
 import java.io.IOException;
 import java.io.File;
 import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.nio.channels.*;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-
 public class LoaderServer {
-	
-	String name;
-	private static Log log = LogFactory.getLog(LoaderServer.class);
-	private static FileLock lock = null;
-        private static FileOutputStream pidFileOutput = null;
-	
-	public LoaderServer(String name){
-		this.name=name;
-	}
-	
-	public void init() throws IOException{
-  	     String pidLong=ManagementFactory.getRuntimeMXBean().getName();
-  	     String[] items=pidLong.split("@");
-  	     String pid=items[0];
-	     String chukwaPath=System.getProperty("CHUKWA_HOME");
-	     StringBuffer pidFilesb=new StringBuffer();
-	     pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid");
-	     try{
-	         File pidFile= new File(pidFilesb.toString());
-
-	         pidFileOutput= new FileOutputStream(pidFile);
-             pidFileOutput.write(pid.getBytes());
-	         pidFileOutput.flush();
-	         FileChannel channel = pidFileOutput.getChannel();
-	         LoaderServer.lock = channel.tryLock();
-             if(LoaderServer.lock!=null) {
-	             log.info("Initlization succeeded...");
-             } else {
-                 throw(new IOException());
-             }
-	     }catch (IOException ex){
-	    	 System.out.println("Initializaiton failed: can not write pid file.");
-	    	 log.error("Initialization failed...");
-	    	 log.error(ex.getMessage());
-	    	 System.exit(-1);
-	    	 throw ex;
-	    	 
-	     }
-	   
-	}	
-	
-	public void clean(){
-        String chukwaPath=System.getenv("CHUKWA_HOME");
-        StringBuffer pidFilesb=new StringBuffer();
-        pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid"); 
-        String pidFileName=pidFilesb.toString();
-
-        File pidFile=new File(pidFileName);
-        if (!pidFile.exists()) {
-    	   log.error("Delete pid file, No such file or directory: "+pidFileName);
-        } else {
-           try {
-               lock.release();
-	       pidFileOutput.close();
-           } catch(IOException e) {
-               log.error("Unable to release file lock: "+pidFileName);
-           }
-        }
-
-        boolean result=pidFile.delete();
-        if (!result){
-    	   log.error("Delete pid file failed, "+pidFileName);
-        }
-	}
+
+  String name;
+  private static Log log = LogFactory.getLog(LoaderServer.class);
+  private static FileLock lock = null;
+  private static FileOutputStream pidFileOutput = null;
+
+  public LoaderServer(String name) {
+    this.name = name;
+  }
+
+  public void init() throws IOException {
+    String pidLong = ManagementFactory.getRuntimeMXBean().getName();
+    String[] items = pidLong.split("@");
+    String pid = items[0];
+    String chukwaPath = System.getProperty("CHUKWA_HOME");
+    StringBuffer pidFilesb = new StringBuffer();
+    pidFilesb.append(chukwaPath).append("/var/run/").append(name)
+        .append(".pid");
+    try {
+      File pidFile = new File(pidFilesb.toString());
+
+      pidFileOutput = new FileOutputStream(pidFile);
+      pidFileOutput.write(pid.getBytes());
+      pidFileOutput.flush();
+      FileChannel channel = pidFileOutput.getChannel();
+      LoaderServer.lock = channel.tryLock();
+      if (LoaderServer.lock != null) {
+        log.info("Initlization succeeded...");
+      } else {
+        throw (new IOException());
+      }
+    } catch (IOException ex) {
+      System.out.println("Initializaiton failed: can not write pid file.");
+      log.error("Initialization failed...");
+      log.error(ex.getMessage());
+      System.exit(-1);
+      throw ex;
+
+    }
+
+  }
+
+  public void clean() {
+    String chukwaPath = System.getenv("CHUKWA_HOME");
+    StringBuffer pidFilesb = new StringBuffer();
+    pidFilesb.append(chukwaPath).append("/var/run/").append(name)
+        .append(".pid");
+    String pidFileName = pidFilesb.toString();
+
+    File pidFile = new File(pidFileName);
+    if (!pidFile.exists()) {
+      log.error("Delete pid file, No such file or directory: " + pidFileName);
+    } else {
+      try {
+        lock.release();
+        pidFileOutput.close();
+      } catch (IOException e) {
+        log.error("Unable to release file lock: " + pidFileName);
+      }
+    }
+
+    boolean result = pidFile.delete();
+    if (!result) {
+      log.error("Delete pid file failed, " + pidFileName);
+    }
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java Wed Mar 11 22:39:26 2009
@@ -17,108 +17,102 @@
  */
 package org.apache.hadoop.chukwa.inputtools.mdl;
 
+
 import java.lang.Thread;
 import java.lang.management.ManagementFactory;
 import java.io.FileOutputStream;
 import java.sql.SQLException;
 import java.io.IOException;
 import java.io.File;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.chukwa.inputtools.mdl.TorqueInfoProcessor;
 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
 import org.apache.hadoop.chukwa.util.PidFile;
 
 public class TorqueDataLoader {
-	   private static Log log = LogFactory.getLog("TorqueDataLoader");
+  private static Log log = LogFactory.getLog("TorqueDataLoader");
+
+  private TorqueInfoProcessor tp = null;
+  private PidFile loader = null;
+
+  public TorqueDataLoader(DataConfig mdlConfig, int interval) {
+    log.info("in torqueDataLoader");
+    tp = new TorqueInfoProcessor(mdlConfig, interval);
+    loader = new PidFile("TorqueDataLoader");
+  }
+
+  public void run() {
+    boolean first = true;
+    while (true) {
+      try {
+        tp.setup(first);
+        first = false;
+      } catch (Exception ex) {
+        tp.shutdown();
+
+        if (first) {
+          log.error("setup error");
+          ex.printStackTrace();
+          loader.clean(); // only call before system.exit()
+          System.exit(1);
+        }
+        log.error("setup fail, retry after 10 minutes");
+        try {
+          Thread.sleep(600 * 1000);
+        } catch (InterruptedException e) {
+          // TODO Auto-generated catch block
+          log.error(e.getMessage());
+          // e.printStackTrace();
+        }
+        continue;
+
+      }
+
+      try {
+        tp.run_forever();
+      } catch (SQLException ex) {
+        tp.shutdown();
+        log.error("processor died, reconnect again after 10 minutes");
+        ex.printStackTrace();
+        try {
+          Thread.sleep(600 * 1000);
+        } catch (InterruptedException e) {
+          // TODO Auto-generated catch block
+          log.error(e.getMessage());
+          // e.printStackTrace();
+        }
+      } catch (Exception ex) {
+        try {
+          Thread.sleep(16 * 1000);
+        } catch (InterruptedException e) {
+          ;
+        }
+        tp.shutdown();
+        log.error("process died...." + ex.getMessage());
+        loader.clean();
+        System.exit(1);
+      }
+
+    }// while
+
+  }
+
+  public static void main(String[] args) {
+    /*
+     * if (args.length < 2 || args[0].startsWith("-h") ||
+     * args[0].startsWith("--h")) {
+     * System.out.println("Usage: UtilDataLoader interval(sec)");
+     * System.exit(1);puvw-./chij } String interval = args[0]; int
+     * intervalValue=Integer.parseInt(interval);
+     */
+    int intervalValue = 60;
+
+    DataConfig mdlConfig = new DataConfig();
+
+    TorqueDataLoader tdl = new TorqueDataLoader(mdlConfig, intervalValue);
+    tdl.run();
 
-	    private TorqueInfoProcessor tp=null;
-        private PidFile loader=null;
-	  
-	    
-	    public TorqueDataLoader (DataConfig mdlConfig, int interval){
-	    	log.info("in torqueDataLoader");
-	   	    tp = new TorqueInfoProcessor(mdlConfig, interval);
-	   	    loader=new PidFile("TorqueDataLoader");
-	    }
-	    
-	    	    	    	    
-	    public void run(){
- 	        boolean first=true;
-	   	    while(true){
-	       	   try{
-	   	           tp.setup(first);
-	   	           first=false;
-	   	        }catch (Exception ex){
-	   	    	   tp.shutdown();
-	   	           
-	   	    	   if (first){
-	   	    	      log.error("setup error");
-	   	    	      ex.printStackTrace();
-	   	    	      loader.clean(); // only call before system.exit()
-	   	    	      System.exit(1);
-	   	            }
-	   	           log.error("setup fail, retry after 10 minutes");
-	   	           try {
-	                     Thread.sleep(600*1000);
-	               } catch (InterruptedException e) {
-	                // TODO Auto-generated catch block
-	                	 log.error(e.getMessage());
-	               // e.printStackTrace();
-	               }
-	   		       continue;   		 
-	   		 
-	   	       }
-	   	     
-	   	       try{
-	   		        tp.run_forever();
-	   	       }catch (SQLException ex) {
-	   		        tp.shutdown();
-	   	    	    log.error("processor died, reconnect again after 10 minutes");
-	   	    	    ex.printStackTrace();
-	   		        try {
-	                     Thread.sleep(600*1000);
-	                } catch (InterruptedException e) {
-	                     // TODO Auto-generated catch block
-	                	    log.error(e.getMessage());
-	                     // e.printStackTrace();
-	                }
-	   	       }catch (Exception ex){
-	   		        try {
-	                     Thread.sleep(16*1000);
-	                } catch (InterruptedException e) {
-	                            ;
-	                }
-	   	    	   tp.shutdown();
-	   	    	   log.error("process died...."+ex.getMessage());
-	   	    	   loader.clean();
-	   	    	   System.exit(1);
-	   	       }
-	   	       
-	   	  }//while
-	   
-	    }
-	    
-	    
-		 public static void main(String[] args) {
-			   /* if (args.length < 2 || args[0].startsWith("-h")
-			        || args[0].startsWith("--h")) {
-			      System.out.println("Usage: UtilDataLoader interval(sec)");
-			      System.exit(1);puvw-./chij
-			    }
-			    String interval = args[0];
-			    int intervalValue=Integer.parseInt(interval);
-			    */
-			   int intervalValue=60;
-
-
-	           DataConfig mdlConfig=new DataConfig();
-	           
-	           TorqueDataLoader tdl = new TorqueDataLoader(mdlConfig, intervalValue);
-	           tdl.run();
+  }
 
-		 }        
-		
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java Wed Mar 11 22:39:26 2009
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.chukwa.inputtools.mdl;
 
+
 import java.sql.SQLException;
 import java.sql.ResultSet;
 import java.lang.Exception;
@@ -40,7 +41,6 @@
 import java.lang.InterruptedException;
 import java.lang.System;
 import java.util.Date;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
@@ -48,439 +48,440 @@
 import org.apache.hadoop.chukwa.inputtools.mdl.ErStreamHandler;
 import org.apache.hadoop.chukwa.util.DatabaseWriter;
 
-
 public class TorqueInfoProcessor {
-    
-	private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
-    
-    private int intervalValue=60;
-	private String torqueServer = null;
-	private String torqueBinDir= null;
-	private String domain = null;
-
-    private TreeMap <String,TreeMap<String,String>> currentHodJobs;
-    
-    
-	public TorqueInfoProcessor(DataConfig mdlConfig, int interval){
-		this.intervalValue=interval;
-		
-		torqueServer=System.getProperty("TORQUE_SERVER");
-		torqueBinDir=System.getProperty("TORQUE_HOME")+File.separator+"bin";
-		domain=System.getProperty("DOMAIN");
-	    currentHodJobs=new TreeMap<String,TreeMap<String,String>>();
-	}
-	
-	
-	
-	public void setup(boolean recover)throws Exception {
-	 }
-		 
-	 private void  getHodJobInfo() throws IOException {
-		 StringBuffer sb=new StringBuffer();
-		 sb.append(torqueBinDir).append("/qstat -a");
-	 
-		 String[] getQueueInfoCommand=new String [3];
-		 getQueueInfoCommand[0]="ssh";
-		 getQueueInfoCommand[1]=torqueServer;
-		 getQueueInfoCommand[2]=sb.toString();
-		
-		 
-         String command=getQueueInfoCommand[0]+" "+getQueueInfoCommand[1]+" "+getQueueInfoCommand[2];
-		 ProcessBuilder pb= new ProcessBuilder(getQueueInfoCommand);
-         
-		 Process p=pb.start();
-		 
-		 Timer timeout=new Timer();
-		 TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
-		 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
-
-		 BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
-		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,true);
-		 errorHandler.start();
-		 
-		 String line = null;
-		 boolean start=false;
-         TreeSet<String> jobsInTorque=new TreeSet<String>();
-		 while((line=result.readLine())!=null){
-			 if (line.startsWith("---")){				 
-				 start=true;
-				 continue;
-			 }
-
-			 if(start){
-				 String [] items=line.split("\\s+");
-				 if (items.length>=10){
-				     String hodIdLong=items[0];				     
-				     String hodId=hodIdLong.split("[.]")[0];
-				     String userId=items[1];
-				     String numOfMachine=items[5];
-				     String status=items[9];
-				     jobsInTorque.add(hodId);
-                     if (!currentHodJobs.containsKey(hodId)) {
-                         TreeMap <String,String> aJobData=new TreeMap <String,String>();
-                     
-				         aJobData.put("userId", userId);
-				         aJobData.put("numOfMachine",numOfMachine);
-				         aJobData.put("traceCheckCount","0");
-                         aJobData.put("process", "0");
-				         aJobData.put("status",status);
-				         currentHodJobs.put(hodId,aJobData);
-				     }else {
-				    	 TreeMap <String,String> aJobData= currentHodJobs.get(hodId);
-				    	 aJobData.put("status", status);
-				         currentHodJobs.put(hodId,aJobData);
-				     }//if..else
-				 }				 
-			 }
-		 }//while
-		 
-         try {
-        	 errorHandler.join();
-         }catch (InterruptedException ie){
-        	 log.error(ie.getMessage());
-         }
-		 timeout.cancel();
-		 
-		 Set<String> currentHodJobIds=currentHodJobs.keySet();
-		 Iterator<String> currentHodJobIdsIt=currentHodJobIds.iterator();
-		 TreeSet<String> finishedHodIds=new TreeSet<String>();
-		 while (currentHodJobIdsIt.hasNext()){
-			 String hodId=currentHodJobIdsIt.next();
-			 if (!jobsInTorque.contains(hodId)) {
-				TreeMap <String,String> aJobData=currentHodJobs.get(hodId);
-				String process=aJobData.get("process");
-				if (process.equals("0") || process.equals("1")) {	
-					aJobData.put("status", "C");
-				}else {
-					finishedHodIds.add(hodId);
-				}
-			 }
-		 }//while
-		 
-		 Iterator<String >finishedHodIdsIt=finishedHodIds.iterator();
-		 while (finishedHodIdsIt.hasNext()){
-			 String hodId=finishedHodIdsIt.next();
-			 currentHodJobs.remove(hodId);
-		 }
-		  		 		 	 
-	 }
-	 
-	 private boolean loadQstatData(String hodId) throws IOException, SQLException {
-		 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
-		 String userId=aJobData.get("userId");
-		 
-		 StringBuffer sb=new StringBuffer();
-		 sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
-		 String[] qstatCommand=new String [3];
-		 qstatCommand[0]="ssh";
-		 qstatCommand[1]=torqueServer;
-		 qstatCommand[2]=sb.toString();
-		 
-         String command=qstatCommand[0]+" "+qstatCommand[1]+" "+qstatCommand[2];
-		 ProcessBuilder pb= new ProcessBuilder(qstatCommand);         
-		 Process p=pb.start();
-		 
-		 Timer timeout=new Timer();
-		 TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
-		 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
-
-		 BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
-		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
-		 errorHandler.start();
-		 String line=null;
-         String hosts=null;
-         long startTimeValue=-1;
-         long endTimeValue=Calendar.getInstance().getTimeInMillis();
-         long executeTimeValue=Calendar.getInstance().getTimeInMillis();
-         boolean qstatfinished;
-        
-		 while((line=result.readLine())!=null){
-			 if (line.indexOf("ctime")>=0){
-				 String startTime=line.split("=")[1].trim();
-				 //Tue Sep  9 23:44:29 2008
-				 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
-				 Date startTimeDate;
-				try {
-					startTimeDate = sdf.parse(startTime);
-					 startTimeValue=startTimeDate.getTime();
-				} catch (ParseException e) {
-					// TODO Auto-generated catch block
-					e.printStackTrace();
-				}
-				 
-			 }
-			 if (line.indexOf("mtime")>=0){
-				 String endTime=line.split("=")[1].trim();
-				 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
-				 Date endTimeDate;
-				try {
-					endTimeDate = sdf.parse(endTime);
-					endTimeValue=endTimeDate.getTime();
-				} catch (ParseException e) {
-					// TODO Auto-generated catch block
-					e.printStackTrace();
-				}
-				 
-			 }
-			 if (line.indexOf("etime")>=0){
-				 String executeTime=line.split("=")[1].trim();
-				 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
-				 Date executeTimeDate;
-				try {
-					executeTimeDate = sdf.parse(executeTime);
-					executeTimeValue=executeTimeDate.getTime();
-				} catch (ParseException e) {
-					// TODO Auto-generated catch block
-					e.printStackTrace();
-				}
-				 
-			 }			 
-			 if (line.indexOf("exec_host")>=0){
-				 hosts=line.split("=")[1].trim();
-			 }
-		  }
-		 
-		  if (hosts!=null && startTimeValue>=0) {
-			 String [] items2=hosts.split("[+]"); 
-			 int num=0;
-		     for (int i=0;i<items2.length;i++) {
-		    	 String machinetmp=items2[i];
-		    	 if( machinetmp.length()>3){
- 			    	 String machine=items2[i].substring(0,items2[i].length()-2);
-            	     StringBuffer data=new StringBuffer();
-            	     data.append("HodId=").append(hodId);
-            	     data.append(", Machine=").append(machine);
-            	     if(domain!=null) {
-            	    	 data.append(".").append(domain);
-            	     }
-            	     log.info(data);
-            	     num++;   
-			      }
-		     }
-			 Timestamp startTimedb=new Timestamp(startTimeValue);
-			 Timestamp endTimedb=new Timestamp(endTimeValue);
-			 StringBuffer data=new StringBuffer();
-			 long timeQueued=executeTimeValue-startTimeValue;
-			 data.append("HodID=").append(hodId);
-			 data.append(", UserId=").append(userId);		
-			 data.append(", StartTime=").append(startTimedb);
-			 data.append(", TimeQueued=").append(timeQueued);
-			 data.append(", NumOfMachines=").append(num);
-			 data.append(", EndTime=").append(endTimedb);
-    	     log.info(data);
-			 qstatfinished=true;
-			 
-	      } else{
-		   		   
-             qstatfinished=false;
+
+  private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
+
+  private int intervalValue = 60;
+  private String torqueServer = null;
+  private String torqueBinDir = null;
+  private String domain = null;
+
+  private TreeMap<String, TreeMap<String, String>> currentHodJobs;
+
+  public TorqueInfoProcessor(DataConfig mdlConfig, int interval) {
+    this.intervalValue = interval;
+
+    torqueServer = System.getProperty("TORQUE_SERVER");
+    torqueBinDir = System.getProperty("TORQUE_HOME") + File.separator + "bin";
+    domain = System.getProperty("DOMAIN");
+    currentHodJobs = new TreeMap<String, TreeMap<String, String>>();
+  }
+
+  public void setup(boolean recover) throws Exception {
+  }
+
+  private void getHodJobInfo() throws IOException {
+    StringBuffer sb = new StringBuffer();
+    sb.append(torqueBinDir).append("/qstat -a");
+
+    String[] getQueueInfoCommand = new String[3];
+    getQueueInfoCommand[0] = "ssh";
+    getQueueInfoCommand[1] = torqueServer;
+    getQueueInfoCommand[2] = sb.toString();
+
+    String command = getQueueInfoCommand[0] + " " + getQueueInfoCommand[1]
+        + " " + getQueueInfoCommand[2];
+    ProcessBuilder pb = new ProcessBuilder(getQueueInfoCommand);
+
+    Process p = pb.start();
+
+    Timer timeout = new Timer();
+    TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
+    timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
+
+    BufferedReader result = new BufferedReader(new InputStreamReader(p
+        .getInputStream()));
+    ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
+        command, true);
+    errorHandler.start();
+
+    String line = null;
+    boolean start = false;
+    TreeSet<String> jobsInTorque = new TreeSet<String>();
+    while ((line = result.readLine()) != null) {
+      if (line.startsWith("---")) {
+        start = true;
+        continue;
+      }
+
+      if (start) {
+        String[] items = line.split("\\s+");
+        if (items.length >= 10) {
+          String hodIdLong = items[0];
+          String hodId = hodIdLong.split("[.]")[0];
+          String userId = items[1];
+          String numOfMachine = items[5];
+          String status = items[9];
+          jobsInTorque.add(hodId);
+          if (!currentHodJobs.containsKey(hodId)) {
+            TreeMap<String, String> aJobData = new TreeMap<String, String>();
+
+            aJobData.put("userId", userId);
+            aJobData.put("numOfMachine", numOfMachine);
+            aJobData.put("traceCheckCount", "0");
+            aJobData.put("process", "0");
+            aJobData.put("status", status);
+            currentHodJobs.put(hodId, aJobData);
+          } else {
+            TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+            aJobData.put("status", status);
+            currentHodJobs.put(hodId, aJobData);
+          }// if..else
+        }
+      }
+    }// while
+
+    try {
+      errorHandler.join();
+    } catch (InterruptedException ie) {
+      log.error(ie.getMessage());
+    }
+    timeout.cancel();
+
+    Set<String> currentHodJobIds = currentHodJobs.keySet();
+    Iterator<String> currentHodJobIdsIt = currentHodJobIds.iterator();
+    TreeSet<String> finishedHodIds = new TreeSet<String>();
+    while (currentHodJobIdsIt.hasNext()) {
+      String hodId = currentHodJobIdsIt.next();
+      if (!jobsInTorque.contains(hodId)) {
+        TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+        String process = aJobData.get("process");
+        if (process.equals("0") || process.equals("1")) {
+          aJobData.put("status", "C");
+        } else {
+          finishedHodIds.add(hodId);
+        }
+      }
+    }// while
+
+    Iterator<String> finishedHodIdsIt = finishedHodIds.iterator();
+    while (finishedHodIdsIt.hasNext()) {
+      String hodId = finishedHodIdsIt.next();
+      currentHodJobs.remove(hodId);
+    }
+
+  }
+
+  private boolean loadQstatData(String hodId) throws IOException, SQLException {
+    TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+    String userId = aJobData.get("userId");
+
+    StringBuffer sb = new StringBuffer();
+    sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
+    String[] qstatCommand = new String[3];
+    qstatCommand[0] = "ssh";
+    qstatCommand[1] = torqueServer;
+    qstatCommand[2] = sb.toString();
+
+    String command = qstatCommand[0] + " " + qstatCommand[1] + " "
+        + qstatCommand[2];
+    ProcessBuilder pb = new ProcessBuilder(qstatCommand);
+    Process p = pb.start();
+
+    Timer timeout = new Timer();
+    TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
+    timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
+
+    BufferedReader result = new BufferedReader(new InputStreamReader(p
+        .getInputStream()));
+    ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
+        command, false);
+    errorHandler.start();
+    String line = null;
+    String hosts = null;
+    long startTimeValue = -1;
+    long endTimeValue = Calendar.getInstance().getTimeInMillis();
+    long executeTimeValue = Calendar.getInstance().getTimeInMillis();
+    boolean qstatfinished;
+
+    while ((line = result.readLine()) != null) {
+      if (line.indexOf("ctime") >= 0) {
+        String startTime = line.split("=")[1].trim();
+        // Tue Sep 9 23:44:29 2008
+        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+        Date startTimeDate;
+        try {
+          startTimeDate = sdf.parse(startTime);
+          startTimeValue = startTimeDate.getTime();
+        } catch (ParseException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+
+      }
+      if (line.indexOf("mtime") >= 0) {
+        String endTime = line.split("=")[1].trim();
+        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+        Date endTimeDate;
+        try {
+          endTimeDate = sdf.parse(endTime);
+          endTimeValue = endTimeDate.getTime();
+        } catch (ParseException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+
+      }
+      if (line.indexOf("etime") >= 0) {
+        String executeTime = line.split("=")[1].trim();
+        SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+        Date executeTimeDate;
+        try {
+          executeTimeDate = sdf.parse(executeTime);
+          executeTimeValue = executeTimeDate.getTime();
+        } catch (ParseException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+
+      }
+      if (line.indexOf("exec_host") >= 0) {
+        hosts = line.split("=")[1].trim();
+      }
+    }
+
+    if (hosts != null && startTimeValue >= 0) {
+      String[] items2 = hosts.split("[+]");
+      int num = 0;
+      for (int i = 0; i < items2.length; i++) {
+        String machinetmp = items2[i];
+        if (machinetmp.length() > 3) {
+          String machine = items2[i].substring(0, items2[i].length() - 2);
+          StringBuffer data = new StringBuffer();
+          data.append("HodId=").append(hodId);
+          data.append(", Machine=").append(machine);
+          if (domain != null) {
+            data.append(".").append(domain);
           }
-		    
-          try {
-        	 errorHandler.join();
-          }catch (InterruptedException ie){
-        	 log.error(ie.getMessage());
+          log.info(data);
+          num++;
+        }
+      }
+      Timestamp startTimedb = new Timestamp(startTimeValue);
+      Timestamp endTimedb = new Timestamp(endTimeValue);
+      StringBuffer data = new StringBuffer();
+      long timeQueued = executeTimeValue - startTimeValue;
+      data.append("HodID=").append(hodId);
+      data.append(", UserId=").append(userId);
+      data.append(", StartTime=").append(startTimedb);
+      data.append(", TimeQueued=").append(timeQueued);
+      data.append(", NumOfMachines=").append(num);
+      data.append(", EndTime=").append(endTimedb);
+      log.info(data);
+      qstatfinished = true;
+
+    } else {
+
+      qstatfinished = false;
+    }
+
+    try {
+      errorHandler.join();
+    } catch (InterruptedException ie) {
+      log.error(ie.getMessage());
+    }
+    result.close();
+    timeout.cancel();
+
+    return qstatfinished;
+  }
+
+  private boolean loadTraceJobData(String hodId) throws IOException,
+      SQLException {
+    TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+    String userId = aJobData.get("userId");
+    String process = aJobData.get("process");
+
+    StringBuffer sb = new StringBuffer();
+    sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
+    String[] traceJobCommand = new String[3];
+    traceJobCommand[0] = "ssh";
+    traceJobCommand[1] = torqueServer;
+    traceJobCommand[2] = sb.toString();
+
+    String command = traceJobCommand[0] + " " + traceJobCommand[1] + " "
+        + traceJobCommand[2];
+    ProcessBuilder pb = new ProcessBuilder(traceJobCommand);
+
+    Process p = pb.start();
+
+    Timer timeout = new Timer();
+    TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
+    timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
+
+    BufferedReader result = new BufferedReader(new InputStreamReader(p
+        .getInputStream()));
+    ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
+        command, false);
+    errorHandler.start();
+    String line = null;
+    String exit_status = null;
+    String hosts = null;
+    long timeQueued = -1;
+    long startTimeValue = -1;
+    long endTimeValue = -1;
+    boolean findResult = false;
+
+    while ((line = result.readLine()) != null && !findResult) {
+      if (line.indexOf("end") >= 0 && line.indexOf("Exit_status") >= 0
+          && line.indexOf("qtime") >= 0) {
+        TreeMap<String, String> jobData = new TreeMap<String, String>();
+        String[] items = line.split("\\s+");
+        for (int i = 0; i < items.length; i++) {
+          String[] items2 = items[i].split("=");
+          if (items2.length >= 2) {
+            jobData.put(items2[0], items2[1]);
           }
-          result.close();
-          timeout.cancel();
-         	   
-          return qstatfinished;
-	 }
-	
-	 
-	 private boolean loadTraceJobData(String hodId) throws IOException,SQLException{
-		 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
-		 String userId=aJobData.get("userId");
-		 String process=aJobData.get("process");
-		 
-		 StringBuffer sb=new StringBuffer();
-		 sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
-		 String[] traceJobCommand=new String [3];
-		 traceJobCommand[0]="ssh";
-		 traceJobCommand[1]=torqueServer;
-		 traceJobCommand[2]=sb.toString();
-		 
-         String command=traceJobCommand[0]+" "+traceJobCommand[1]+" "+traceJobCommand[2];
-		 ProcessBuilder pb= new ProcessBuilder(traceJobCommand);
-         
-		 Process p=pb.start();
-		 
-		 Timer timeout=new Timer();
-		 TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
-		 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
-
-		 BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
-		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
-		 errorHandler.start();
-		 String line=null;
-         String exit_status=null;
-         String hosts=null;
-         long timeQueued=-1;
-         long startTimeValue=-1;
-         long endTimeValue=-1;
-         boolean findResult=false;
-
-        
-		 while((line=result.readLine())!=null&& ! findResult){
-			 if (line.indexOf("end")>=0 &&line.indexOf("Exit_status")>=0 && line.indexOf("qtime")>=0){
-			      TreeMap <String,String> jobData=new TreeMap <String,String>() ;
-			      String [] items=line.split("\\s+");
-			      for (int i=0;i<items.length; i++) {
-			    	 String [] items2 = items[i].split("=");
-			    	 if (items2.length>=2){
-			    		 jobData.put(items2[0], items2[1]);
-			    	 }
-
-			      }
-	              String startTime=jobData.get("ctime");
-			      startTimeValue=Long.valueOf(startTime);
-			      startTimeValue=startTimeValue-startTimeValue%(60);
-			      Timestamp startTimedb=new Timestamp(startTimeValue*1000);
-			       
-			      String queueTime=jobData.get("qtime");
-			      long queueTimeValue=Long.valueOf(queueTime);
-			      
-			      String sTime=jobData.get("start");
-			      long sTimeValue=Long.valueOf(sTime);
-			      			      
-			      timeQueued=sTimeValue-queueTimeValue;
-			      
-			      String endTime=jobData.get("end");
-			      endTimeValue=Long.valueOf(endTime);
-			      endTimeValue=endTimeValue-endTimeValue%(60);
-			      Timestamp endTimedb=new Timestamp(endTimeValue*1000);
-			      
-			      exit_status=jobData.get("Exit_status");
-			      hosts=jobData.get("exec_host");
-			      String [] items2=hosts.split("[+]");
-			      int num=0;
-			      for (int i=0;i<items2.length;i++) {
-			          String machinetemp=items2[i];
-			          if (machinetemp.length()>=3){	            		 
-			    		 String machine=items2[i].substring(0,items2[i].length()-2);
-			    	     StringBuffer data=new StringBuffer();
-			    	     data.append("HodId=").append(hodId);
-			             data.append(", Machine=").append(machine);
-		            	 if(domain!=null) {
-			            	  data.append(".").append(domain);
-			             }
-			    	     log.info(data.toString());
-			    	     num++;
-			    	  }  
-			      }
-			      
-			      StringBuffer data=new StringBuffer();
-			      data.append("HodID=").append(hodId);
-			      data.append(", UserId=").append(userId);		
-			      data.append(", Status=").append(exit_status);
-		    	  data.append(", TimeQueued=").append(timeQueued);
-		    	  data.append(", StartTime=").append(startTimedb);
-		    	  data.append(", EndTime=").append(endTimedb);
-		    	  data.append(", NumOfMachines=").append(num);
-		          log.info(data.toString());
-				  findResult=true;
-		          log.debug(" hod info for job "+hodId+" has been loaded ");
-			 }//if
-			 
-		}//while 
-
-         try {
-        	 errorHandler.join();
-         }catch (InterruptedException ie){
-        	 log.error(ie.getMessage());
-         }
-         
-        timeout.cancel();
-        boolean tracedone=false;
-        if (!findResult){
-        	
-            String traceCheckCount=aJobData.get("traceCheckCount");
-            int traceCheckCountValue=Integer.valueOf(traceCheckCount);
-            traceCheckCountValue=traceCheckCountValue+1;           
-            aJobData.put("traceCheckCount",String.valueOf(traceCheckCountValue));
-
-            
-            log.debug("did not find tracejob info for job "+hodId+", after "+traceCheckCountValue+" times checking");
-            if (traceCheckCountValue>=2){ 
-            	tracedone= true;            	
+
+        }
+        String startTime = jobData.get("ctime");
+        startTimeValue = Long.valueOf(startTime);
+        startTimeValue = startTimeValue - startTimeValue % (60);
+        Timestamp startTimedb = new Timestamp(startTimeValue * 1000);
+
+        String queueTime = jobData.get("qtime");
+        long queueTimeValue = Long.valueOf(queueTime);
+
+        String sTime = jobData.get("start");
+        long sTimeValue = Long.valueOf(sTime);
+
+        timeQueued = sTimeValue - queueTimeValue;
+
+        String endTime = jobData.get("end");
+        endTimeValue = Long.valueOf(endTime);
+        endTimeValue = endTimeValue - endTimeValue % (60);
+        Timestamp endTimedb = new Timestamp(endTimeValue * 1000);
+
+        exit_status = jobData.get("Exit_status");
+        hosts = jobData.get("exec_host");
+        String[] items2 = hosts.split("[+]");
+        int num = 0;
+        for (int i = 0; i < items2.length; i++) {
+          String machinetemp = items2[i];
+          if (machinetemp.length() >= 3) {
+            String machine = items2[i].substring(0, items2[i].length() - 2);
+            StringBuffer data = new StringBuffer();
+            data.append("HodId=").append(hodId);
+            data.append(", Machine=").append(machine);
+            if (domain != null) {
+              data.append(".").append(domain);
             }
+            log.info(data.toString());
+            num++;
+          }
         }
-        boolean finished=findResult|tracedone;	   
-        return finished;
-	 }
-	 
-		 
-		 
-	 private void process_data() throws SQLException{
-	
-		 long currentTime=System.currentTimeMillis();
-		 currentTime=currentTime-currentTime%(60*1000);
-		 Timestamp timestamp=new Timestamp(currentTime);
-		 
-		 Set<String> hodIds=currentHodJobs.keySet();
-		 
-		 Iterator<String> hodIdsIt=hodIds.iterator();
-		 while (hodIdsIt.hasNext()){
-			 String hodId=(String) hodIdsIt.next();
-			 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
-			 String status=aJobData.get("status");
-			 String process=aJobData.get("process");
-			 if (process.equals("0") && (status.equals("R") ||status.equals("E"))){
-			     try {
-			    	 boolean result=loadQstatData(hodId);
-			    	 if (result){
-			    		 aJobData.put("process","1");
-				    	 currentHodJobs.put(hodId, aJobData);			    		 
-			    	 }
-			     }catch (IOException ioe){
-			    	 log.error("load qsat data Error:"+ioe.getMessage());
-			    	  
-			     }
-			 }
-			 if (! process.equals("2") && status.equals("C")){
-				 try {
-			    	boolean result=loadTraceJobData(hodId);
-			    	
-			    	if (result){
-			    		aJobData.put("process","2");
-			    		currentHodJobs.put(hodId, aJobData);
-			    	}
-				 }catch (IOException ioe){
-					 log.error("loadTraceJobData Error:"+ioe.getMessage());
-				 }
-			 }//if
-			
-			 
-		 } //while
-		 
-	 }
-	 	 
-	 private void handle_jobData() throws SQLException{		 
-		 try {
-		     getHodJobInfo();
-		 }catch (IOException ex){
-			 log.error("getQueueInfo Error:"+ex.getMessage());
-			 return;
-		 }
-		 try {    
-	         process_data();
-		 } catch (SQLException ex){
-			 log.error("process_data Error:"+ex.getMessage());
-			 throw ex;
-		 }
-	 }
-     	 
-	 public void run_forever() throws SQLException{    	            
-     	  while(true){
-          	  handle_jobData();
-              try {
-                  log.debug("sleeping ...");
-                  Thread.sleep(this.intervalValue*1000);
-              } catch (InterruptedException e) {
-                  log.error(e.getMessage()); 	
-              }
+
+        StringBuffer data = new StringBuffer();
+        data.append("HodID=").append(hodId);
+        data.append(", UserId=").append(userId);
+        data.append(", Status=").append(exit_status);
+        data.append(", TimeQueued=").append(timeQueued);
+        data.append(", StartTime=").append(startTimedb);
+        data.append(", EndTime=").append(endTimedb);
+        data.append(", NumOfMachines=").append(num);
+        log.info(data.toString());
+        findResult = true;
+        log.debug(" hod info for job " + hodId + " has been loaded ");
+      }// if
+
+    }// while
+
+    try {
+      errorHandler.join();
+    } catch (InterruptedException ie) {
+      log.error(ie.getMessage());
+    }
+
+    timeout.cancel();
+    boolean tracedone = false;
+    if (!findResult) {
+
+      String traceCheckCount = aJobData.get("traceCheckCount");
+      int traceCheckCountValue = Integer.valueOf(traceCheckCount);
+      traceCheckCountValue = traceCheckCountValue + 1;
+      aJobData.put("traceCheckCount", String.valueOf(traceCheckCountValue));
+
+      log.debug("did not find tracejob info for job " + hodId + ", after "
+          + traceCheckCountValue + " times checking");
+      if (traceCheckCountValue >= 2) {
+        tracedone = true;
+      }
+    }
+    boolean finished = findResult | tracedone;
+    return finished;
+  }
+
+  private void process_data() throws SQLException {
+
+    long currentTime = System.currentTimeMillis();
+    currentTime = currentTime - currentTime % (60 * 1000);
+    Timestamp timestamp = new Timestamp(currentTime);
+
+    Set<String> hodIds = currentHodJobs.keySet();
+
+    Iterator<String> hodIdsIt = hodIds.iterator();
+    while (hodIdsIt.hasNext()) {
+      String hodId = (String) hodIdsIt.next();
+      TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+      String status = aJobData.get("status");
+      String process = aJobData.get("process");
+      if (process.equals("0") && (status.equals("R") || status.equals("E"))) {
+        try {
+          boolean result = loadQstatData(hodId);
+          if (result) {
+            aJobData.put("process", "1");
+            currentHodJobs.put(hodId, aJobData);
           }
-     }
+        } catch (IOException ioe) {
+          log.error("load qsat data Error:" + ioe.getMessage());
+
+        }
+      }
+      if (!process.equals("2") && status.equals("C")) {
+        try {
+          boolean result = loadTraceJobData(hodId);
+
+          if (result) {
+            aJobData.put("process", "2");
+            currentHodJobs.put(hodId, aJobData);
+          }
+        } catch (IOException ioe) {
+          log.error("loadTraceJobData Error:" + ioe.getMessage());
+        }
+      }// if
+
+    } // while
+
+  }
+
+  private void handle_jobData() throws SQLException {
+    try {
+      getHodJobInfo();
+    } catch (IOException ex) {
+      log.error("getQueueInfo Error:" + ex.getMessage());
+      return;
+    }
+    try {
+      process_data();
+    } catch (SQLException ex) {
+      log.error("process_data Error:" + ex.getMessage());
+      throw ex;
+    }
+  }
+
+  public void run_forever() throws SQLException {
+    while (true) {
+      handle_jobData();
+      try {
+        log.debug("sleeping ...");
+        Thread.sleep(this.intervalValue * 1000);
+      } catch (InterruptedException e) {
+        log.error(e.getMessage());
+      }
+    }
+  }
 
-	 public void shutdown(){
-     }
+  public void shutdown() {
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java Wed Mar 11 22:39:26 2009
@@ -17,34 +17,35 @@
  */
 package org.apache.hadoop.chukwa.inputtools.mdl;
 
+
 import java.util.TimerTask;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class TorqueTimerTask extends TimerTask{
-	private Process ps=null;
-	private String command;
-	
-	private static Log log = LogFactory.getLog(TorqueTimerTask.class);
-    //public static int timeoutInterval=300;
-    public static int timeoutInterval=180;
-    
-	public TorqueTimerTask() {
-		super();
-		// TODO Auto-generated constructor stub
-	}
-	
-	public  TorqueTimerTask(Process process,String command){
-    	super();
-    	this.ps=process;
-    	this.command=command;
-    	
-    }
-	
-	public void run() {
-		ps.destroy();
-	    log.error("torque command: "+command+" timed out");
-		
-	}
+public class TorqueTimerTask extends TimerTask {
+  private Process ps = null;
+  private String command;
+
+  private static Log log = LogFactory.getLog(TorqueTimerTask.class);
+  // public static int timeoutInterval=300;
+  public static int timeoutInterval = 180;
+
+  public TorqueTimerTask() {
+    super();
+    // TODO Auto-generated constructor stub
+  }
+
+  public TorqueTimerTask(Process process, String command) {
+    super();
+    this.ps = process;
+    this.command = command;
+
+  }
+
+  public void run() {
+    ps.destroy();
+    log.error("torque command: " + command + " timed out");
+
+  }
 
 }