You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [13/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoo...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java Wed Mar 11 22:39:26 2009
@@ -5,10 +5,9 @@
* License version 1.1, a copy of which has been included with this
* distribution in the LICENSE.txt file. */
-
-
package org.apache.hadoop.chukwa.inputtools.log4j;
+
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -22,7 +21,6 @@
import java.util.Locale;
import java.util.TimeZone;
import java.util.regex.Pattern;
-
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
import org.apache.hadoop.chukwa.util.RecordConstants;
import org.apache.log4j.FileAppender;
@@ -135,57 +133,56 @@
<b>DatePattern</b> option. The text before the colon is interpeted
as the protocol specificaion of a URL which is probably not what
you want. */
-
+
public class ChukwaDailyRollingFileAppender extends FileAppender {
- static Logger log = Logger.getLogger(ChukwaDailyRollingFileAppender.class);
+ static Logger log = Logger.getLogger(ChukwaDailyRollingFileAppender.class);
// The code assumes that the following constants are in a increasing
// sequence.
- static final int TOP_OF_TROUBLE=-1;
+ static final int TOP_OF_TROUBLE = -1;
static final int TOP_OF_MINUTE = 0;
- static final int TOP_OF_HOUR = 1;
- static final int HALF_DAY = 2;
- static final int TOP_OF_DAY = 3;
- static final int TOP_OF_WEEK = 4;
- static final int TOP_OF_MONTH = 5;
+ static final int TOP_OF_HOUR = 1;
+ static final int HALF_DAY = 2;
+ static final int TOP_OF_DAY = 3;
+ static final int TOP_OF_WEEK = 4;
+ static final int TOP_OF_MONTH = 5;
static final String adaptorType = ChukwaAgentController.CharFileTailUTF8NewLineEscaped;
static final Object lock = new Object();
static String lastRotation = "";
-
+
/**
- The date pattern. By default, the pattern is set to
- "'.'yyyy-MM-dd" meaning daily rollover.
+ * The date pattern. By default, the pattern is set to "'.'yyyy-MM-dd" meaning
+ * daily rollover.
*/
private String datePattern = "'.'yyyy-MM-dd";
/**
- The log file will be renamed to the value of the
- scheduledFilename variable when the next interval is entered. For
- example, if the rollover period is one hour, the log file will be
- renamed to the value of "scheduledFilename" at the beginning of
- the next hour.
-
- The precise time when a rollover occurs depends on logging
- activity.
+ * The log file will be renamed to the value of the scheduledFilename variable
+ * when the next interval is entered. For example, if the rollover period is
+ * one hour, the log file will be renamed to the value of "scheduledFilename"
+ * at the beginning of the next hour.
+ *
+ * The precise time when a rollover occurs depends on logging activity.
*/
private String scheduledFilename;
/**
- The next time we estimate a rollover should occur. */
- private long nextCheck = System.currentTimeMillis () - 1;
+ * The next time we estimate a rollover should occur.
+ */
+ private long nextCheck = System.currentTimeMillis() - 1;
/**
* Regex to select log files to be deleted
*/
private String cleanUpRegex = null;
-
+
/**
* Set the maximum number of backup files to keep around.
*/
private int maxBackupIndex = 10;
-
+
Date now = new Date();
SimpleDateFormat sdf;
@@ -197,45 +194,43 @@
ChukwaAgentController chukwaClient;
boolean chukwaClientIsNull = true;
static final Object chukwaLock = new Object();
-
+
String chukwaClientHostname;
int chukwaClientPortNum;
long chukwaClientConnectNumRetry;
long chukwaClientConnectRetryInterval;
String recordType;
-
-
-
-
+
// The gmtTimeZone is used only in computeCheckPeriod() method.
static final TimeZone gmtTimeZone = TimeZone.getTimeZone("GMT");
-
/**
- The default constructor does nothing. */
- public ChukwaDailyRollingFileAppender() throws IOException{
+ * The default constructor does nothing.
+ */
+ public ChukwaDailyRollingFileAppender() throws IOException {
super();
}
- /**
+/**
Instantiate a <code>DailyRollingFileAppender</code> and open the
file designated by <code>filename</code>. The opened filename will
become the output destination for this appender.
*/
- public ChukwaDailyRollingFileAppender (Layout layout, String filename,
- String datePattern) throws IOException {
+ public ChukwaDailyRollingFileAppender(Layout layout, String filename,
+ String datePattern) throws IOException {
super(layout, filename, true);
- System.out.println("Daily Rolling File Appender successfully registered file with agent: " + filename);
+ System.out
+ .println("Daily Rolling File Appender successfully registered file with agent: "
+ + filename);
this.datePattern = datePattern;
activateOptions();
}
/**
- The <b>DatePattern</b> takes a string in the same format as
- expected by {@link SimpleDateFormat}. This options determines the
- rollover schedule.
+ * The <b>DatePattern</b> takes a string in the same format as expected by
+ * {@link SimpleDateFormat}. This options determines the rollover schedule.
*/
public void setDatePattern(String pattern) {
datePattern = pattern;
@@ -245,66 +240,64 @@
public String getDatePattern() {
return datePattern;
}
-
- public String getRecordType(){
+
+ public String getRecordType() {
if (recordType != null)
return recordType;
else
return "unknown";
}
-
- public void setRecordType(String recordType){
+
+ public void setRecordType(String recordType) {
this.recordType = recordType;
}
public void activateOptions() {
super.activateOptions();
- if(datePattern != null && fileName != null) {
+ if (datePattern != null && fileName != null) {
now.setTime(System.currentTimeMillis());
sdf = new SimpleDateFormat(datePattern);
int type = computeCheckPeriod();
printPeriodicity(type);
rc.setType(type);
File file = new File(fileName);
- scheduledFilename = fileName+sdf.format(new Date(file.lastModified()));
+ scheduledFilename = fileName + sdf.format(new Date(file.lastModified()));
} else {
- LogLog.error("Either File or DatePattern options are not set for appender ["
- +name+"].");
+ LogLog
+ .error("Either File or DatePattern options are not set for appender ["
+ + name + "].");
}
}
void printPeriodicity(int type) {
- switch(type) {
+ switch (type) {
case TOP_OF_MINUTE:
- LogLog.debug("Appender ["+name+"] to be rolled every minute.");
+ LogLog.debug("Appender [" + name + "] to be rolled every minute.");
break;
case TOP_OF_HOUR:
- LogLog.debug("Appender ["+name
- +"] to be rolled on top of every hour.");
+ LogLog
+ .debug("Appender [" + name + "] to be rolled on top of every hour.");
break;
case HALF_DAY:
- LogLog.debug("Appender ["+name
- +"] to be rolled at midday and midnight.");
+ LogLog.debug("Appender [" + name
+ + "] to be rolled at midday and midnight.");
break;
case TOP_OF_DAY:
- LogLog.debug("Appender ["+name
- +"] to be rolled at midnight.");
+ LogLog.debug("Appender [" + name + "] to be rolled at midnight.");
break;
case TOP_OF_WEEK:
- LogLog.debug("Appender ["+name
- +"] to be rolled at start of week.");
+ LogLog.debug("Appender [" + name + "] to be rolled at start of week.");
break;
case TOP_OF_MONTH:
- LogLog.debug("Appender ["+name
- +"] to be rolled at start of every month.");
+ LogLog.debug("Appender [" + name
+ + "] to be rolled at start of every month.");
break;
default:
- LogLog.warn("Unknown periodicity for appender ["+name+"].");
+ LogLog.warn("Unknown periodicity for appender [" + name + "].");
}
}
-
// This method computes the roll over period by looping over the
// periods, starting with the shortest, and stopping when the r0 is
// different from from r1, where r0 is the epoch formatted according
@@ -315,19 +308,21 @@
// GMT (the epoch).
int computeCheckPeriod() {
- RollingCalendar rollingCalendar = new RollingCalendar(gmtTimeZone, Locale.ENGLISH);
+ RollingCalendar rollingCalendar = new RollingCalendar(gmtTimeZone,
+ Locale.ENGLISH);
// set sate to 1970-01-01 00:00:00 GMT
Date epoch = new Date(0);
- if(datePattern != null) {
- for(int i = TOP_OF_MINUTE; i <= TOP_OF_MONTH; i++) {
+ if (datePattern != null) {
+ for (int i = TOP_OF_MINUTE; i <= TOP_OF_MONTH; i++) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(datePattern);
- simpleDateFormat.setTimeZone(gmtTimeZone); // do all date formatting in GMT
+ simpleDateFormat.setTimeZone(gmtTimeZone); // do all date formatting in
+ // GMT
String r0 = simpleDateFormat.format(epoch);
rollingCalendar.setType(i);
Date next = new Date(rollingCalendar.getNextCheckMillis(epoch));
- String r1 = simpleDateFormat.format(next);
- //System.out.println("Type = "+i+", r0 = "+r0+", r1 = "+r1);
- if(r0 != null && r1 != null && !r0.equals(r1)) {
+ String r1 = simpleDateFormat.format(next);
+ // System.out.println("Type = "+i+", r0 = "+r0+", r1 = "+r1);
+ if (r0 != null && r1 != null && !r0.equals(r1)) {
return i;
}
}
@@ -336,7 +331,7 @@
}
/**
- Rollover the current file to a new file.
+ * Rollover the current file to a new file.
*/
void rollOver() throws IOException {
@@ -346,7 +341,7 @@
return;
}
- String datedFilename = fileName+sdf.format(now);
+ String datedFilename = fileName + sdf.format(now);
// It is too early to roll over because we are still within the
// bounds of the current interval. Rollover will occur once the
// next interval is reached.
@@ -354,38 +349,35 @@
return;
}
-
// close current file, and rename it to datedFilename
this.closeFile();
-
- File target = new File(scheduledFilename);
+ File target = new File(scheduledFilename);
if (target.exists()) {
target.delete();
}
File file = new File(fileName);
-
+
boolean result = file.renameTo(target);
- if(result) {
- LogLog.debug(fileName +" -> "+ scheduledFilename);
+ if (result) {
+ LogLog.debug(fileName + " -> " + scheduledFilename);
} else {
- LogLog.error("Failed to rename ["+fileName+"] to ["+scheduledFilename+"].");
+ LogLog.error("Failed to rename [" + fileName + "] to ["
+ + scheduledFilename + "].");
}
try {
// This will also close the file. This is OK since multiple
// close operations are safe.
this.setFile(fileName, false, this.bufferedIO, this.bufferSize);
+ } catch (IOException e) {
+ errorHandler.error("setFile(" + fileName + ", false) call failed.");
}
- catch(IOException e) {
- errorHandler.error("setFile("+fileName+", false) call failed.");
- }
scheduledFilename = datedFilename;
cleanUp();
}
-
public String getCleanUpRegex() {
return cleanUpRegex;
}
@@ -406,194 +398,194 @@
String regex = "";
try {
File actualFile = new File(fileName);
-
+
String directoryName = actualFile.getParent();
String actualFileName = actualFile.getName();
File dirList = new File(directoryName);
-
-
+
if (cleanUpRegex == null || !cleanUpRegex.contains("$fileName")) {
- LogLog.error("cleanUpRegex == null || !cleanUpRegex.contains(\"$fileName\")");
+ LogLog
+ .error("cleanUpRegex == null || !cleanUpRegex.contains(\"$fileName\")");
return;
}
- regex =cleanUpRegex.replace("$fileName", actualFileName);
- String[] dirFiles = dirList.list(new LogFilter(actualFileName,regex));
-
+ regex = cleanUpRegex.replace("$fileName", actualFileName);
+ String[] dirFiles = dirList.list(new LogFilter(actualFileName, regex));
+
List<String> files = new ArrayList<String>();
- for(String file: dirFiles) {
- files.add(file);
+ for (String file : dirFiles) {
+ files.add(file);
}
Collections.sort(files);
-
- while(files.size() > maxBackupIndex) {
+
+ while (files.size() > maxBackupIndex) {
String file = files.remove(0);
- File f = new File(directoryName + "/" +file);
+ File f = new File(directoryName + "/" + file);
f.delete();
- LogLog.debug("Removing: " +file);
+ LogLog.debug("Removing: " + file);
}
- } catch(Exception e) {
- errorHandler.error("cleanUp("+fileName+"," + regex +") call failed.");
+ } catch (Exception e) {
+ errorHandler
+ .error("cleanUp(" + fileName + "," + regex + ") call failed.");
}
}
-
+
private class LogFilter implements FilenameFilter {
private Pattern p = null;
private String logFile = null;
-
- public LogFilter(String logFile,String regex) {
+
+ public LogFilter(String logFile, String regex) {
this.logFile = logFile;
- p = Pattern.compile(regex);
+ p = Pattern.compile(regex);
}
-
+
@Override
public boolean accept(File dir, String name) {
// ignore current log file
- if (name.intern() == this.logFile.intern() ) {
+ if (name.intern() == this.logFile.intern()) {
return false;
}
- //ignore file without the same prefix
+ // ignore file without the same prefix
if (!name.startsWith(logFile)) {
return false;
}
return p.matcher(name).find();
}
}
-
- private class ClientFinalizer extends Thread
- {
- private ChukwaAgentController chukwaClient = null;
- private String recordType = null;
- private String fileName = null;
- public ClientFinalizer(ChukwaAgentController chukwaClient,String recordType, String fileName)
- {
- this.chukwaClient = chukwaClient;
- this.recordType = recordType;
- this.fileName = fileName;
- }
- public synchronized void run()
- {
- try
- {
- if (chukwaClient != null)
- {
- log.debug("ShutdownHook: removing:" + fileName);
- chukwaClient.removeFile(recordType, fileName);
- }
- else
- {
- LogLog.warn("chukwaClient is null cannot do any cleanup");
- }
- }
- catch (Throwable e)
- {
- LogLog.warn("closing the controller threw an exception:\n" + e);
- }
- }
- }
- private ClientFinalizer clientFinalizer = null;
-
+
+ private class ClientFinalizer extends Thread {
+ private ChukwaAgentController chukwaClient = null;
+ private String recordType = null;
+ private String fileName = null;
+
+ public ClientFinalizer(ChukwaAgentController chukwaClient,
+ String recordType, String fileName) {
+ this.chukwaClient = chukwaClient;
+ this.recordType = recordType;
+ this.fileName = fileName;
+ }
+
+ public synchronized void run() {
+ try {
+ if (chukwaClient != null) {
+ log.debug("ShutdownHook: removing:" + fileName);
+ chukwaClient.removeFile(recordType, fileName);
+ } else {
+ LogLog.warn("chukwaClient is null cannot do any cleanup");
+ }
+ } catch (Throwable e) {
+ LogLog.warn("closing the controller threw an exception:\n" + e);
+ }
+ }
+ }
+
+ private ClientFinalizer clientFinalizer = null;
+
/**
- * This method differentiates DailyRollingFileAppender from its
- * super class.
- *
- * <p>Before actually logging, this method will check whether it is
- * time to do a rollover. If it is, it will schedule the next
- * rollover time and then rollover.
+ * This method differentiates DailyRollingFileAppender from its super class.
+ *
+ * <p>Before actually logging, this method will check whether it is time to do
+ * a rollover. If it is, it will schedule the next rollover time and then
+ * rollover.
* */
- protected void subAppend(LoggingEvent event)
- {
- try
- {
- //we set up the chukwa adaptor here because this is the first
- //point which is called after all setters have been called with
- //their values from the log4j.properties file, in particular we
- //needed to give setCukwaClientPortNum() and -Hostname() a shot
-
- // Make sure only one thread can do this
- // and use the boolean to avoid the first level locking
- if (chukwaClientIsNull)
- {
- synchronized(chukwaLock)
- {
- if (chukwaClient == null){
- if (getChukwaClientHostname() != null && getChukwaClientPortNum() != 0){
- chukwaClient = new ChukwaAgentController(getChukwaClientHostname(), getChukwaClientPortNum());
- log.debug("setup adaptor with hostname " + getChukwaClientHostname() + " and portnum " + getChukwaClientPortNum());
- }
- else{
- chukwaClient = new ChukwaAgentController();
- log.debug("setup adaptor with no args, which means it used its defaults");
- }
-
- chukwaClientIsNull = false;
-
- // Watchdog is watching for ChukwaAgent only once every 5 minutes, so there's no point in retrying more than once every 5 mins.
- // In practice, if the watchdog is not able to automatically restart the agent, it will take more than 20 minutes to get Ops to restart it.
- // Also its a good to limit the number of communications between Hadoop and Chukwa, that's why 30 minutes.
- long retryInterval = chukwaClientConnectRetryInterval;
- if (retryInterval == 0) {
- retryInterval = 1000 * 60 * 30;
- }
- long numRetries = chukwaClientConnectNumRetry;
- if (numRetries == 0) {
- numRetries = 48;
- }
- String log4jFileName = getFile();
- String recordType = getRecordType();
- long adaptorID = chukwaClient.addFile(recordType, log4jFileName, numRetries, retryInterval);
-
- // Setup a shutdownHook for the controller
- clientFinalizer = new ClientFinalizer(chukwaClient,recordType,log4jFileName);
- Runtime.getRuntime().addShutdownHook(clientFinalizer);
-
-
- if (adaptorID > 0){
- log.debug("Added file tailing adaptor to chukwa agent for file " + log4jFileName + "using this recordType :" + recordType);
- }
- else{
- log.debug("Chukwa adaptor not added, addFile(" + log4jFileName + ") returned " + adaptorID);
- }
-
- }
- }
- }
-
-
- long n = System.currentTimeMillis();
- if (n >= nextCheck) {
- now.setTime(n);
- nextCheck = rc.getNextCheckMillis(now);
- try {
- rollOver();
- }
- catch(IOException ioe) {
- LogLog.error("rollOver() failed.", ioe);
- }
- }
- //escape the newlines from record bodies and then write this record to the log file
- this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",this.layout.format(event)));
-
- if(layout.ignoresThrowable()) {
- String[] s = event.getThrowableStrRep();
- if (s != null) {
- int len = s.length;
- for(int i = 0; i < len; i++) {
- this.qw.write(s[i]);
- this.qw.write(Layout.LINE_SEP);
- }
- }
- }
-
- if(this.immediateFlush) {
- this.qw.flush();
- }
- }
- catch(Throwable e)
- {
- System.err.println("Exception in ChukwaRollingAppender: " + e.getMessage());
- e.printStackTrace();
- }
-
+ protected void subAppend(LoggingEvent event) {
+ try {
+ // we set up the chukwa adaptor here because this is the first
+ // point which is called after all setters have been called with
+ // their values from the log4j.properties file, in particular we
+ // needed to give setCukwaClientPortNum() and -Hostname() a shot
+
+ // Make sure only one thread can do this
+ // and use the boolean to avoid the first level locking
+ if (chukwaClientIsNull) {
+ synchronized (chukwaLock) {
+ if (chukwaClient == null) {
+ if (getChukwaClientHostname() != null
+ && getChukwaClientPortNum() != 0) {
+ chukwaClient = new ChukwaAgentController(
+ getChukwaClientHostname(), getChukwaClientPortNum());
+ log.debug("setup adaptor with hostname "
+ + getChukwaClientHostname() + " and portnum "
+ + getChukwaClientPortNum());
+ } else {
+ chukwaClient = new ChukwaAgentController();
+ log
+ .debug("setup adaptor with no args, which means it used its defaults");
+ }
+
+ chukwaClientIsNull = false;
+
+ // Watchdog is watching for ChukwaAgent only once every 5 minutes,
+ // so there's no point in retrying more than once every 5 mins.
+ // In practice, if the watchdog is not able to automatically restart
+ // the agent, it will take more than 20 minutes to get Ops to
+ // restart it.
+ // Also its a good to limit the number of communications between
+ // Hadoop and Chukwa, that's why 30 minutes.
+ long retryInterval = chukwaClientConnectRetryInterval;
+ if (retryInterval == 0) {
+ retryInterval = 1000 * 60 * 30;
+ }
+ long numRetries = chukwaClientConnectNumRetry;
+ if (numRetries == 0) {
+ numRetries = 48;
+ }
+ String log4jFileName = getFile();
+ String recordType = getRecordType();
+ long adaptorID = chukwaClient.addFile(recordType, log4jFileName,
+ numRetries, retryInterval);
+
+ // Setup a shutdownHook for the controller
+ clientFinalizer = new ClientFinalizer(chukwaClient, recordType,
+ log4jFileName);
+ Runtime.getRuntime().addShutdownHook(clientFinalizer);
+
+ if (adaptorID > 0) {
+ log.debug("Added file tailing adaptor to chukwa agent for file "
+ + log4jFileName + "using this recordType :" + recordType);
+ } else {
+ log.debug("Chukwa adaptor not added, addFile(" + log4jFileName
+ + ") returned " + adaptorID);
+ }
+
+ }
+ }
+ }
+
+ long n = System.currentTimeMillis();
+ if (n >= nextCheck) {
+ now.setTime(n);
+ nextCheck = rc.getNextCheckMillis(now);
+ try {
+ rollOver();
+ } catch (IOException ioe) {
+ LogLog.error("rollOver() failed.", ioe);
+ }
+ }
+ // escape the newlines from record bodies and then write this record to
+ // the log file
+ this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",
+ this.layout.format(event)));
+
+ if (layout.ignoresThrowable()) {
+ String[] s = event.getThrowableStrRep();
+ if (s != null) {
+ int len = s.length;
+ for (int i = 0; i < len; i++) {
+ this.qw.write(s[i]);
+ this.qw.write(Layout.LINE_SEP);
+ }
+ }
+ }
+
+ if (this.immediateFlush) {
+ this.qw.flush();
+ }
+ } catch (Throwable e) {
+ System.err.println("Exception in ChukwaRollingAppender: "
+ + e.getMessage());
+ e.printStackTrace();
+ }
+
}
public String getChukwaClientHostname() {
@@ -612,38 +604,37 @@
this.chukwaClientPortNum = chukwaClientPortNum;
}
- public void setChukwaClientConnectNumRetry(int i){
+ public void setChukwaClientConnectNumRetry(int i) {
this.chukwaClientConnectNumRetry = i;
}
-
+
public void setChukwaClientConnectRetryInterval(long i) {
this.chukwaClientConnectRetryInterval = i;
}
-
-}
+}
/**
- * RollingCalendar is a helper class to DailyRollingFileAppender.
- * Given a periodicity type and the current time, it computes the
- * start of the next interval.
+ * RollingCalendar is a helper class to DailyRollingFileAppender. Given a
+ * periodicity type and the current time, it computes the start of the next
+ * interval.
* */
class RollingCalendar extends GregorianCalendar {
/**
*
*/
- private static final long serialVersionUID = 2153481574198792767L;
-int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
+ private static final long serialVersionUID = 2153481574198792767L;
+ int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
RollingCalendar() {
super();
- }
+ }
RollingCalendar(TimeZone tz, Locale locale) {
super(tz, locale);
- }
+ }
void setType(int type) {
this.type = type;
@@ -656,7 +647,7 @@
public Date getNextCheckDate(Date now) {
this.setTime(now);
- switch(type) {
+ switch (type) {
case ChukwaDailyRollingFileAppender.TOP_OF_MINUTE:
this.set(Calendar.SECOND, 0);
this.set(Calendar.MILLISECOND, 0);
@@ -673,7 +664,7 @@
this.set(Calendar.SECOND, 0);
this.set(Calendar.MILLISECOND, 0);
int hour = get(Calendar.HOUR_OF_DAY);
- if(hour < 12) {
+ if (hour < 12) {
this.set(Calendar.HOUR_OF_DAY, 12);
} else {
this.set(Calendar.HOUR_OF_DAY, 0);
@@ -707,4 +698,3 @@
return getTime();
}
}
-
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java Wed Mar 11 22:39:26 2009
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.chukwa.inputtools.log4j;
+
import java.io.*;
import java.util.Enumeration;
import java.util.logging.LogManager;
@@ -34,28 +35,27 @@
public class Log4JMetricsContext extends AbstractMetricsContext {
- Logger out = null; //Logger.getLogger(Log4JMetricsContext.class);
+ Logger out = null; // Logger.getLogger(Log4JMetricsContext.class);
static final Object lock = new Object();
-
+
/* Configuration attribute names */
-// protected static final String FILE_NAME_PROPERTY = "fileName";
+ // protected static final String FILE_NAME_PROPERTY = "fileName";
protected static final String PERIOD_PROPERTY = "period";
- private static final String metricsLogDir = System.getProperty("hadoop.log.dir");
+ private static final String metricsLogDir = System
+ .getProperty("hadoop.log.dir");
private static final String user = System.getProperty("user.name");
-
/** Creates a new instance of FileContext */
- public Log4JMetricsContext() {}
-
+ public Log4JMetricsContext() {
+ }
+
public void init(String contextName, ContextFactory factory) {
super.init(contextName, factory);
- /*
- String fileName = getAttribute(FILE_NAME_PROPERTY);
- if (fileName != null) {
- file = new File(fileName);
- }
- */
-
+ /*
+ * String fileName = getAttribute(FILE_NAME_PROPERTY); if (fileName != null)
+ * { file = new File(fileName); }
+ */
+
String periodStr = getAttribute(PERIOD_PROPERTY);
if (periodStr != null) {
int period = 0;
@@ -69,63 +69,82 @@
setPeriod(period);
}
}
-
+
@Override
- protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
- throws IOException {
- if (out == null) {
- synchronized(lock) {
- if (out == null) {
- java.util.Properties properties = new java.util.Properties();
- properties.load(this.getClass().getClassLoader().getResourceAsStream("chukwa-hadoop-metrics-log4j.properties"));
- Logger logger = Logger.getLogger(Log4JMetricsContext.class);
- logger.setAdditivity(false);
- PatternLayout layout = new PatternLayout(properties.getProperty("log4j.appender.chukwa."+contextName+".layout.ConversionPattern"));
- org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender appender =
- new org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender();
- appender.setName("chukwa."+contextName);
- appender.setLayout(layout);
- appender.setAppend(true);
- if(properties.getProperty("log4j.appender.chukwa."+contextName+".Dir")!=null) {
- String logName = properties.getProperty("log4j.appender.chukwa."+contextName+".Dir")
- +File.separator+"chukwa-"+user+"-"+contextName + "-" + System.currentTimeMillis() +".log";
-
- // FIXME: Hack to make the log file readable by chukwa user.
- if(System.getProperty("os.name").intern()=="Linux".intern()) {
- Runtime.getRuntime().exec("chmod 640 "+logName);
- }
- appender.setFile(logName);
- } else {
- appender.setFile(metricsLogDir+File.separator+"chukwa-"+user+"-"
- +contextName + "-" + System.currentTimeMillis()+ ".log");
- }
- appender.activateOptions();
- appender.setRecordType(properties.getProperty("log4j.appender.chukwa."+contextName+".recordType"));
- appender.setChukwaClientHostname(properties.getProperty("log4j.appender.chukwa."+contextName+".chukwaClientHostname"));
- appender.setChukwaClientPortNum(Integer.parseInt(properties.getProperty("log4j.appender.chukwa."+contextName+".chukwaClientPortNum")));
- appender.setDatePattern(properties.getProperty("log4j.appender.chukwa."+contextName+".DatePattern"));
- logger.addAppender(appender);
- out = logger;
- }
- }
- }
-
-
- JSONObject json = new JSONObject();
+ protected void emitRecord(String contextName, String recordName,
+ OutputRecord outRec) throws IOException {
+ if (out == null) {
+ synchronized (lock) {
+ if (out == null) {
+ java.util.Properties properties = new java.util.Properties();
+ properties.load(this.getClass().getClassLoader().getResourceAsStream(
+ "chukwa-hadoop-metrics-log4j.properties"));
+ Logger logger = Logger.getLogger(Log4JMetricsContext.class);
+ logger.setAdditivity(false);
+ PatternLayout layout = new PatternLayout(properties
+ .getProperty("log4j.appender.chukwa." + contextName
+ + ".layout.ConversionPattern"));
+ org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender appender = new org.apache.hadoop.chukwa.inputtools.log4j.ChukwaDailyRollingFileAppender();
+ appender.setName("chukwa." + contextName);
+ appender.setLayout(layout);
+ appender.setAppend(true);
+ if (properties.getProperty("log4j.appender.chukwa." + contextName
+ + ".Dir") != null) {
+ String logName = properties.getProperty("log4j.appender.chukwa."
+ + contextName + ".Dir")
+ + File.separator
+ + "chukwa-"
+ + user
+ + "-"
+ + contextName
+ + "-"
+ + System.currentTimeMillis() + ".log";
+
+ // FIXME: Hack to make the log file readable by chukwa user.
+ if (System.getProperty("os.name").intern() == "Linux".intern()) {
+ Runtime.getRuntime().exec("chmod 640 " + logName);
+ }
+ appender.setFile(logName);
+ } else {
+ appender
+ .setFile(metricsLogDir + File.separator + "chukwa-" + user
+ + "-" + contextName + "-" + System.currentTimeMillis()
+ + ".log");
+ }
+ appender.activateOptions();
+ appender.setRecordType(properties
+ .getProperty("log4j.appender.chukwa." + contextName
+ + ".recordType"));
+ appender.setChukwaClientHostname(properties
+ .getProperty("log4j.appender.chukwa." + contextName
+ + ".chukwaClientHostname"));
+ appender.setChukwaClientPortNum(Integer.parseInt(properties
+ .getProperty("log4j.appender.chukwa." + contextName
+ + ".chukwaClientPortNum")));
+ appender.setDatePattern(properties
+ .getProperty("log4j.appender.chukwa." + contextName
+ + ".DatePattern"));
+ logger.addAppender(appender);
+ out = logger;
+ }
+ }
+ }
+
+ JSONObject json = new JSONObject();
try {
- json.put("contextName", contextName);
- json.put("recordName", recordName);
- json.put("chukwa_timestamp", System.currentTimeMillis());
- for (String tagName : outRec.getTagNames()) {
- json.put(tagName, outRec.getTag(tagName));
- }
- for (String metricName : outRec.getMetricNames()) {
- json.put(metricName, outRec.getMetric(metricName));
- }
+ json.put("contextName", contextName);
+ json.put("recordName", recordName);
+ json.put("chukwa_timestamp", System.currentTimeMillis());
+ for (String tagName : outRec.getTagNames()) {
+ json.put(tagName, outRec.getTag(tagName));
+ }
+ for (String metricName : outRec.getMetricNames()) {
+ json.put(metricName, outRec.getMetric(metricName));
+ }
} catch (JSONException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
out.info(json.toString());
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/log4j/OneLineLogLayout.java Wed Mar 11 22:39:26 2009
@@ -18,21 +18,21 @@
package org.apache.hadoop.chukwa.inputtools.log4j;
+
import org.apache.log4j.*;
import org.apache.log4j.spi.LoggingEvent;
public class OneLineLogLayout extends PatternLayout {
-
+
char SEP = ' ';
- public String format(LoggingEvent evt)
- {
-
+
+ public String format(LoggingEvent evt) {
+
String initial_s = super.format(evt);
StringBuilder sb = new StringBuilder();
- for(int i = 0; i < initial_s.length() -1 ; ++i)
- {
+ for (int i = 0; i < initial_s.length() - 1; ++i) {
char c = initial_s.charAt(i);
- if(c == '\n')
+ if (c == '\n')
sb.append(SEP);
else
sb.append(c);
@@ -41,23 +41,21 @@
String[] s = evt.getThrowableStrRep();
if (s != null) {
int len = s.length;
- for(int i = 0; i < len; i++) {
+ for (int i = 0; i < len; i++) {
sb.append(s[i]);
sb.append(SEP);
- }
+ }
}
-
+
sb.append('\n');
return sb.toString();
}
-
- public boolean ignoresThrowable()
- {
+
+ public boolean ignoresThrowable() {
return false;
}
-
- public static void main(String[] args)
- {
+
+ public static void main(String[] args) {
System.setProperty("line.separator", " ");
Logger l = Logger.getRootLogger();
l.removeAllAppenders();
@@ -65,8 +63,7 @@
appender.setName("console");
l.addAppender(appender);
l.warn("testing", new java.io.IOException("just kidding!"));
-
-
+
}
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/DataConfig.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.inputtools.mdl;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -27,51 +28,55 @@
import java.util.Map;
public class DataConfig {
- private static Configuration config;
- final static String DATACONFIG = "mdl.xml";
- private Log log = LogFactory.getLog(DataConfig.class);
-
- public DataConfig(String path) {
- Path fileResource = new Path(path);
+ private static Configuration config;
+ final static String DATACONFIG = "mdl.xml";
+ private Log log = LogFactory.getLog(DataConfig.class);
+
+ public DataConfig(String path) {
+ Path fileResource = new Path(path);
+ config = new Configuration();
+ config.addResource(fileResource);
+ }
+
+ public DataConfig() {
+ String dataConfig = System.getenv("DATACONFIG");
+ if (dataConfig == null) {
+ dataConfig = DATACONFIG;
+ }
+ log.debug("DATACONFIG=" + dataConfig);
+ if (config == null) {
+ try {
+ Path fileResource = new Path(dataConfig);
config = new Configuration();
config.addResource(fileResource);
+ } catch (Exception e) {
+ log.debug("Error reading configuration file:" + dataConfig);
+ }
}
- public DataConfig() {
- String dataConfig = System.getenv("DATACONFIG");
- if(dataConfig==null) {
- dataConfig=DATACONFIG;
- }
- log.debug("DATACONFIG="+dataConfig);
- if(config==null) {
- try {
- Path fileResource = new Path(dataConfig);
- config = new Configuration();
- config.addResource(fileResource);
- } catch (Exception e) {
- log.debug("Error reading configuration file:"+dataConfig);
- }
- }
- }
+ }
- public String get(String key) {
- return config.get(key);
- }
- public void put(String key, String value) {
- config.set(key, value);
- }
- public Iterator<Map.Entry <String, String>> iterator() {
- return config.iterator();
- }
- public HashMap<String, String> startWith(String key) {
- HashMap<String, String> transformer = new HashMap<String, String>();
- Iterator<Map.Entry <String, String>> entries = config.iterator();
- while(entries.hasNext()) {
- String entry = entries.next().toString();
- if(entry.startsWith(key)) {
- String[] metrics = entry.split("=");
- transformer.put(metrics[0],metrics[1]);
- }
- }
- return transformer;
+ public String get(String key) {
+ return config.get(key);
+ }
+
+ public void put(String key, String value) {
+ config.set(key, value);
+ }
+
+ public Iterator<Map.Entry<String, String>> iterator() {
+ return config.iterator();
+ }
+
+ public HashMap<String, String> startWith(String key) {
+ HashMap<String, String> transformer = new HashMap<String, String>();
+ Iterator<Map.Entry<String, String>> entries = config.iterator();
+ while (entries.hasNext()) {
+ String entry = entries.next().toString();
+ if (entry.startsWith(key)) {
+ String[] metrics = entry.split("=");
+ transformer.put(metrics[0], metrics[1]);
+ }
}
+ return transformer;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java Wed Mar 11 22:39:26 2009
@@ -17,48 +17,47 @@
*/
package org.apache.hadoop.chukwa.inputtools.mdl;
+
import java.lang.Thread;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.lang.StringBuffer;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-public class ErStreamHandler extends Thread{
- InputStream inpStr;
- String command;
- boolean record;
-
- private static Log log = LogFactory.getLog(ErStreamHandler.class);
-
- public ErStreamHandler(InputStream inpStr,String command,boolean record){
- this.inpStr=inpStr;
- this.command=command;
- this.record=record;
-
- }
-
- public void run(){
- try {
- InputStreamReader inpStrd=new InputStreamReader(inpStr);
- BufferedReader buffRd=new BufferedReader(inpStrd);
- String line=null;
- StringBuffer sb=new StringBuffer();
- while((line=buffRd.readLine())!= null){
- sb.append(line);
- }
- buffRd.close();
-
- if (record && sb.length()>0) {
- log.error(command+" execution error:"+sb.toString());
- }
-
- }catch (Exception e){
- log.error(command+" error:"+e.getMessage());
- }
- }
-
-
+public class ErStreamHandler extends Thread {
+ InputStream inpStr;
+ String command;
+ boolean record;
+
+ private static Log log = LogFactory.getLog(ErStreamHandler.class);
+
+ public ErStreamHandler(InputStream inpStr, String command, boolean record) {
+ this.inpStr = inpStr;
+ this.command = command;
+ this.record = record;
+
+ }
+
+ public void run() {
+ try {
+ InputStreamReader inpStrd = new InputStreamReader(inpStr);
+ BufferedReader buffRd = new BufferedReader(inpStrd);
+ String line = null;
+ StringBuffer sb = new StringBuffer();
+ while ((line = buffRd.readLine()) != null) {
+ sb.append(line);
+ }
+ buffRd.close();
+
+ if (record && sb.length() > 0) {
+ log.error(command + " execution error:" + sb.toString());
+ }
+
+ } catch (Exception e) {
+ log.error(command + " error:" + e.getMessage());
+ }
+ }
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/LoaderServer.java Wed Mar 11 22:39:26 2009
@@ -18,80 +18,81 @@
package org.apache.hadoop.chukwa.inputtools.mdl;
+
import java.io.IOException;
import java.io.File;
import java.io.*;
import java.lang.management.ManagementFactory;
import java.nio.channels.*;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
public class LoaderServer {
-
- String name;
- private static Log log = LogFactory.getLog(LoaderServer.class);
- private static FileLock lock = null;
- private static FileOutputStream pidFileOutput = null;
-
- public LoaderServer(String name){
- this.name=name;
- }
-
- public void init() throws IOException{
- String pidLong=ManagementFactory.getRuntimeMXBean().getName();
- String[] items=pidLong.split("@");
- String pid=items[0];
- String chukwaPath=System.getProperty("CHUKWA_HOME");
- StringBuffer pidFilesb=new StringBuffer();
- pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid");
- try{
- File pidFile= new File(pidFilesb.toString());
-
- pidFileOutput= new FileOutputStream(pidFile);
- pidFileOutput.write(pid.getBytes());
- pidFileOutput.flush();
- FileChannel channel = pidFileOutput.getChannel();
- LoaderServer.lock = channel.tryLock();
- if(LoaderServer.lock!=null) {
- log.info("Initlization succeeded...");
- } else {
- throw(new IOException());
- }
- }catch (IOException ex){
- System.out.println("Initializaiton failed: can not write pid file.");
- log.error("Initialization failed...");
- log.error(ex.getMessage());
- System.exit(-1);
- throw ex;
-
- }
-
- }
-
- public void clean(){
- String chukwaPath=System.getenv("CHUKWA_HOME");
- StringBuffer pidFilesb=new StringBuffer();
- pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid");
- String pidFileName=pidFilesb.toString();
-
- File pidFile=new File(pidFileName);
- if (!pidFile.exists()) {
- log.error("Delete pid file, No such file or directory: "+pidFileName);
- } else {
- try {
- lock.release();
- pidFileOutput.close();
- } catch(IOException e) {
- log.error("Unable to release file lock: "+pidFileName);
- }
- }
-
- boolean result=pidFile.delete();
- if (!result){
- log.error("Delete pid file failed, "+pidFileName);
- }
- }
+
+ String name;
+ private static Log log = LogFactory.getLog(LoaderServer.class);
+ private static FileLock lock = null;
+ private static FileOutputStream pidFileOutput = null;
+
+ public LoaderServer(String name) {
+ this.name = name;
+ }
+
+ public void init() throws IOException {
+ String pidLong = ManagementFactory.getRuntimeMXBean().getName();
+ String[] items = pidLong.split("@");
+ String pid = items[0];
+ String chukwaPath = System.getProperty("CHUKWA_HOME");
+ StringBuffer pidFilesb = new StringBuffer();
+ pidFilesb.append(chukwaPath).append("/var/run/").append(name)
+ .append(".pid");
+ try {
+ File pidFile = new File(pidFilesb.toString());
+
+ pidFileOutput = new FileOutputStream(pidFile);
+ pidFileOutput.write(pid.getBytes());
+ pidFileOutput.flush();
+ FileChannel channel = pidFileOutput.getChannel();
+ LoaderServer.lock = channel.tryLock();
+ if (LoaderServer.lock != null) {
+ log.info("Initlization succeeded...");
+ } else {
+ throw (new IOException());
+ }
+ } catch (IOException ex) {
+ System.out.println("Initializaiton failed: can not write pid file.");
+ log.error("Initialization failed...");
+ log.error(ex.getMessage());
+ System.exit(-1);
+ throw ex;
+
+ }
+
+ }
+
+ public void clean() {
+ String chukwaPath = System.getenv("CHUKWA_HOME");
+ StringBuffer pidFilesb = new StringBuffer();
+ pidFilesb.append(chukwaPath).append("/var/run/").append(name)
+ .append(".pid");
+ String pidFileName = pidFilesb.toString();
+
+ File pidFile = new File(pidFileName);
+ if (!pidFile.exists()) {
+ log.error("Delete pid file, No such file or directory: " + pidFileName);
+ } else {
+ try {
+ lock.release();
+ pidFileOutput.close();
+ } catch (IOException e) {
+ log.error("Unable to release file lock: " + pidFileName);
+ }
+ }
+
+ boolean result = pidFile.delete();
+ if (!result) {
+ log.error("Delete pid file failed, " + pidFileName);
+ }
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java Wed Mar 11 22:39:26 2009
@@ -17,108 +17,102 @@
*/
package org.apache.hadoop.chukwa.inputtools.mdl;
+
import java.lang.Thread;
import java.lang.management.ManagementFactory;
import java.io.FileOutputStream;
import java.sql.SQLException;
import java.io.IOException;
import java.io.File;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.chukwa.inputtools.mdl.TorqueInfoProcessor;
import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
import org.apache.hadoop.chukwa.util.PidFile;
public class TorqueDataLoader {
- private static Log log = LogFactory.getLog("TorqueDataLoader");
+ private static Log log = LogFactory.getLog("TorqueDataLoader");
+
+ private TorqueInfoProcessor tp = null;
+ private PidFile loader = null;
+
+ public TorqueDataLoader(DataConfig mdlConfig, int interval) {
+ log.info("in torqueDataLoader");
+ tp = new TorqueInfoProcessor(mdlConfig, interval);
+ loader = new PidFile("TorqueDataLoader");
+ }
+
+ public void run() {
+ boolean first = true;
+ while (true) {
+ try {
+ tp.setup(first);
+ first = false;
+ } catch (Exception ex) {
+ tp.shutdown();
+
+ if (first) {
+ log.error("setup error");
+ ex.printStackTrace();
+ loader.clean(); // only call before system.exit()
+ System.exit(1);
+ }
+ log.error("setup fail, retry after 10 minutes");
+ try {
+ Thread.sleep(600 * 1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ log.error(e.getMessage());
+ // e.printStackTrace();
+ }
+ continue;
+
+ }
+
+ try {
+ tp.run_forever();
+ } catch (SQLException ex) {
+ tp.shutdown();
+ log.error("processor died, reconnect again after 10 minutes");
+ ex.printStackTrace();
+ try {
+ Thread.sleep(600 * 1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ log.error(e.getMessage());
+ // e.printStackTrace();
+ }
+ } catch (Exception ex) {
+ try {
+ Thread.sleep(16 * 1000);
+ } catch (InterruptedException e) {
+ ;
+ }
+ tp.shutdown();
+ log.error("process died...." + ex.getMessage());
+ loader.clean();
+ System.exit(1);
+ }
+
+ }// while
+
+ }
+
+ public static void main(String[] args) {
+ /*
+ * if (args.length < 2 || args[0].startsWith("-h") ||
+ * args[0].startsWith("--h")) {
+ * System.out.println("Usage: UtilDataLoader interval(sec)");
+ * System.exit(1);puvw-./chij } String interval = args[0]; int
+ * intervalValue=Integer.parseInt(interval);
+ */
+ int intervalValue = 60;
+
+ DataConfig mdlConfig = new DataConfig();
+
+ TorqueDataLoader tdl = new TorqueDataLoader(mdlConfig, intervalValue);
+ tdl.run();
- private TorqueInfoProcessor tp=null;
- private PidFile loader=null;
-
-
- public TorqueDataLoader (DataConfig mdlConfig, int interval){
- log.info("in torqueDataLoader");
- tp = new TorqueInfoProcessor(mdlConfig, interval);
- loader=new PidFile("TorqueDataLoader");
- }
-
-
- public void run(){
- boolean first=true;
- while(true){
- try{
- tp.setup(first);
- first=false;
- }catch (Exception ex){
- tp.shutdown();
-
- if (first){
- log.error("setup error");
- ex.printStackTrace();
- loader.clean(); // only call before system.exit()
- System.exit(1);
- }
- log.error("setup fail, retry after 10 minutes");
- try {
- Thread.sleep(600*1000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- log.error(e.getMessage());
- // e.printStackTrace();
- }
- continue;
-
- }
-
- try{
- tp.run_forever();
- }catch (SQLException ex) {
- tp.shutdown();
- log.error("processor died, reconnect again after 10 minutes");
- ex.printStackTrace();
- try {
- Thread.sleep(600*1000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- log.error(e.getMessage());
- // e.printStackTrace();
- }
- }catch (Exception ex){
- try {
- Thread.sleep(16*1000);
- } catch (InterruptedException e) {
- ;
- }
- tp.shutdown();
- log.error("process died...."+ex.getMessage());
- loader.clean();
- System.exit(1);
- }
-
- }//while
-
- }
-
-
- public static void main(String[] args) {
- /* if (args.length < 2 || args[0].startsWith("-h")
- || args[0].startsWith("--h")) {
- System.out.println("Usage: UtilDataLoader interval(sec)");
- System.exit(1);puvw-./chij
- }
- String interval = args[0];
- int intervalValue=Integer.parseInt(interval);
- */
- int intervalValue=60;
-
-
- DataConfig mdlConfig=new DataConfig();
-
- TorqueDataLoader tdl = new TorqueDataLoader(mdlConfig, intervalValue);
- tdl.run();
+ }
- }
-
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java Wed Mar 11 22:39:26 2009
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.chukwa.inputtools.mdl;
+
import java.sql.SQLException;
import java.sql.ResultSet;
import java.lang.Exception;
@@ -40,7 +41,6 @@
import java.lang.InterruptedException;
import java.lang.System;
import java.util.Date;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
@@ -48,439 +48,440 @@
import org.apache.hadoop.chukwa.inputtools.mdl.ErStreamHandler;
import org.apache.hadoop.chukwa.util.DatabaseWriter;
-
public class TorqueInfoProcessor {
-
- private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
-
- private int intervalValue=60;
- private String torqueServer = null;
- private String torqueBinDir= null;
- private String domain = null;
-
- private TreeMap <String,TreeMap<String,String>> currentHodJobs;
-
-
- public TorqueInfoProcessor(DataConfig mdlConfig, int interval){
- this.intervalValue=interval;
-
- torqueServer=System.getProperty("TORQUE_SERVER");
- torqueBinDir=System.getProperty("TORQUE_HOME")+File.separator+"bin";
- domain=System.getProperty("DOMAIN");
- currentHodJobs=new TreeMap<String,TreeMap<String,String>>();
- }
-
-
-
- public void setup(boolean recover)throws Exception {
- }
-
- private void getHodJobInfo() throws IOException {
- StringBuffer sb=new StringBuffer();
- sb.append(torqueBinDir).append("/qstat -a");
-
- String[] getQueueInfoCommand=new String [3];
- getQueueInfoCommand[0]="ssh";
- getQueueInfoCommand[1]=torqueServer;
- getQueueInfoCommand[2]=sb.toString();
-
-
- String command=getQueueInfoCommand[0]+" "+getQueueInfoCommand[1]+" "+getQueueInfoCommand[2];
- ProcessBuilder pb= new ProcessBuilder(getQueueInfoCommand);
-
- Process p=pb.start();
-
- Timer timeout=new Timer();
- TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
- timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
-
- BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
- ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,true);
- errorHandler.start();
-
- String line = null;
- boolean start=false;
- TreeSet<String> jobsInTorque=new TreeSet<String>();
- while((line=result.readLine())!=null){
- if (line.startsWith("---")){
- start=true;
- continue;
- }
-
- if(start){
- String [] items=line.split("\\s+");
- if (items.length>=10){
- String hodIdLong=items[0];
- String hodId=hodIdLong.split("[.]")[0];
- String userId=items[1];
- String numOfMachine=items[5];
- String status=items[9];
- jobsInTorque.add(hodId);
- if (!currentHodJobs.containsKey(hodId)) {
- TreeMap <String,String> aJobData=new TreeMap <String,String>();
-
- aJobData.put("userId", userId);
- aJobData.put("numOfMachine",numOfMachine);
- aJobData.put("traceCheckCount","0");
- aJobData.put("process", "0");
- aJobData.put("status",status);
- currentHodJobs.put(hodId,aJobData);
- }else {
- TreeMap <String,String> aJobData= currentHodJobs.get(hodId);
- aJobData.put("status", status);
- currentHodJobs.put(hodId,aJobData);
- }//if..else
- }
- }
- }//while
-
- try {
- errorHandler.join();
- }catch (InterruptedException ie){
- log.error(ie.getMessage());
- }
- timeout.cancel();
-
- Set<String> currentHodJobIds=currentHodJobs.keySet();
- Iterator<String> currentHodJobIdsIt=currentHodJobIds.iterator();
- TreeSet<String> finishedHodIds=new TreeSet<String>();
- while (currentHodJobIdsIt.hasNext()){
- String hodId=currentHodJobIdsIt.next();
- if (!jobsInTorque.contains(hodId)) {
- TreeMap <String,String> aJobData=currentHodJobs.get(hodId);
- String process=aJobData.get("process");
- if (process.equals("0") || process.equals("1")) {
- aJobData.put("status", "C");
- }else {
- finishedHodIds.add(hodId);
- }
- }
- }//while
-
- Iterator<String >finishedHodIdsIt=finishedHodIds.iterator();
- while (finishedHodIdsIt.hasNext()){
- String hodId=finishedHodIdsIt.next();
- currentHodJobs.remove(hodId);
- }
-
- }
-
- private boolean loadQstatData(String hodId) throws IOException, SQLException {
- TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
- String userId=aJobData.get("userId");
-
- StringBuffer sb=new StringBuffer();
- sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
- String[] qstatCommand=new String [3];
- qstatCommand[0]="ssh";
- qstatCommand[1]=torqueServer;
- qstatCommand[2]=sb.toString();
-
- String command=qstatCommand[0]+" "+qstatCommand[1]+" "+qstatCommand[2];
- ProcessBuilder pb= new ProcessBuilder(qstatCommand);
- Process p=pb.start();
-
- Timer timeout=new Timer();
- TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
- timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
-
- BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
- ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
- errorHandler.start();
- String line=null;
- String hosts=null;
- long startTimeValue=-1;
- long endTimeValue=Calendar.getInstance().getTimeInMillis();
- long executeTimeValue=Calendar.getInstance().getTimeInMillis();
- boolean qstatfinished;
-
- while((line=result.readLine())!=null){
- if (line.indexOf("ctime")>=0){
- String startTime=line.split("=")[1].trim();
- //Tue Sep 9 23:44:29 2008
- SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
- Date startTimeDate;
- try {
- startTimeDate = sdf.parse(startTime);
- startTimeValue=startTimeDate.getTime();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- if (line.indexOf("mtime")>=0){
- String endTime=line.split("=")[1].trim();
- SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
- Date endTimeDate;
- try {
- endTimeDate = sdf.parse(endTime);
- endTimeValue=endTimeDate.getTime();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- if (line.indexOf("etime")>=0){
- String executeTime=line.split("=")[1].trim();
- SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
- Date executeTimeDate;
- try {
- executeTimeDate = sdf.parse(executeTime);
- executeTimeValue=executeTimeDate.getTime();
- } catch (ParseException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
- if (line.indexOf("exec_host")>=0){
- hosts=line.split("=")[1].trim();
- }
- }
-
- if (hosts!=null && startTimeValue>=0) {
- String [] items2=hosts.split("[+]");
- int num=0;
- for (int i=0;i<items2.length;i++) {
- String machinetmp=items2[i];
- if( machinetmp.length()>3){
- String machine=items2[i].substring(0,items2[i].length()-2);
- StringBuffer data=new StringBuffer();
- data.append("HodId=").append(hodId);
- data.append(", Machine=").append(machine);
- if(domain!=null) {
- data.append(".").append(domain);
- }
- log.info(data);
- num++;
- }
- }
- Timestamp startTimedb=new Timestamp(startTimeValue);
- Timestamp endTimedb=new Timestamp(endTimeValue);
- StringBuffer data=new StringBuffer();
- long timeQueued=executeTimeValue-startTimeValue;
- data.append("HodID=").append(hodId);
- data.append(", UserId=").append(userId);
- data.append(", StartTime=").append(startTimedb);
- data.append(", TimeQueued=").append(timeQueued);
- data.append(", NumOfMachines=").append(num);
- data.append(", EndTime=").append(endTimedb);
- log.info(data);
- qstatfinished=true;
-
- } else{
-
- qstatfinished=false;
+
+ private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
+
+ private int intervalValue = 60;
+ private String torqueServer = null;
+ private String torqueBinDir = null;
+ private String domain = null;
+
+ private TreeMap<String, TreeMap<String, String>> currentHodJobs;
+
+ public TorqueInfoProcessor(DataConfig mdlConfig, int interval) {
+ this.intervalValue = interval;
+
+ torqueServer = System.getProperty("TORQUE_SERVER");
+ torqueBinDir = System.getProperty("TORQUE_HOME") + File.separator + "bin";
+ domain = System.getProperty("DOMAIN");
+ currentHodJobs = new TreeMap<String, TreeMap<String, String>>();
+ }
+
+ public void setup(boolean recover) throws Exception {
+ }
+
+ private void getHodJobInfo() throws IOException {
+ StringBuffer sb = new StringBuffer();
+ sb.append(torqueBinDir).append("/qstat -a");
+
+ String[] getQueueInfoCommand = new String[3];
+ getQueueInfoCommand[0] = "ssh";
+ getQueueInfoCommand[1] = torqueServer;
+ getQueueInfoCommand[2] = sb.toString();
+
+ String command = getQueueInfoCommand[0] + " " + getQueueInfoCommand[1]
+ + " " + getQueueInfoCommand[2];
+ ProcessBuilder pb = new ProcessBuilder(getQueueInfoCommand);
+
+ Process p = pb.start();
+
+ Timer timeout = new Timer();
+ TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
+ timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
+
+ BufferedReader result = new BufferedReader(new InputStreamReader(p
+ .getInputStream()));
+ ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
+ command, true);
+ errorHandler.start();
+
+ String line = null;
+ boolean start = false;
+ TreeSet<String> jobsInTorque = new TreeSet<String>();
+ while ((line = result.readLine()) != null) {
+ if (line.startsWith("---")) {
+ start = true;
+ continue;
+ }
+
+ if (start) {
+ String[] items = line.split("\\s+");
+ if (items.length >= 10) {
+ String hodIdLong = items[0];
+ String hodId = hodIdLong.split("[.]")[0];
+ String userId = items[1];
+ String numOfMachine = items[5];
+ String status = items[9];
+ jobsInTorque.add(hodId);
+ if (!currentHodJobs.containsKey(hodId)) {
+ TreeMap<String, String> aJobData = new TreeMap<String, String>();
+
+ aJobData.put("userId", userId);
+ aJobData.put("numOfMachine", numOfMachine);
+ aJobData.put("traceCheckCount", "0");
+ aJobData.put("process", "0");
+ aJobData.put("status", status);
+ currentHodJobs.put(hodId, aJobData);
+ } else {
+ TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+ aJobData.put("status", status);
+ currentHodJobs.put(hodId, aJobData);
+ }// if..else
+ }
+ }
+ }// while
+
+ try {
+ errorHandler.join();
+ } catch (InterruptedException ie) {
+ log.error(ie.getMessage());
+ }
+ timeout.cancel();
+
+ Set<String> currentHodJobIds = currentHodJobs.keySet();
+ Iterator<String> currentHodJobIdsIt = currentHodJobIds.iterator();
+ TreeSet<String> finishedHodIds = new TreeSet<String>();
+ while (currentHodJobIdsIt.hasNext()) {
+ String hodId = currentHodJobIdsIt.next();
+ if (!jobsInTorque.contains(hodId)) {
+ TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+ String process = aJobData.get("process");
+ if (process.equals("0") || process.equals("1")) {
+ aJobData.put("status", "C");
+ } else {
+ finishedHodIds.add(hodId);
+ }
+ }
+ }// while
+
+ Iterator<String> finishedHodIdsIt = finishedHodIds.iterator();
+ while (finishedHodIdsIt.hasNext()) {
+ String hodId = finishedHodIdsIt.next();
+ currentHodJobs.remove(hodId);
+ }
+
+ }
+
+ private boolean loadQstatData(String hodId) throws IOException, SQLException {
+ TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+ String userId = aJobData.get("userId");
+
+ StringBuffer sb = new StringBuffer();
+ sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
+ String[] qstatCommand = new String[3];
+ qstatCommand[0] = "ssh";
+ qstatCommand[1] = torqueServer;
+ qstatCommand[2] = sb.toString();
+
+ String command = qstatCommand[0] + " " + qstatCommand[1] + " "
+ + qstatCommand[2];
+ ProcessBuilder pb = new ProcessBuilder(qstatCommand);
+ Process p = pb.start();
+
+ Timer timeout = new Timer();
+ TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
+ timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
+
+ BufferedReader result = new BufferedReader(new InputStreamReader(p
+ .getInputStream()));
+ ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
+ command, false);
+ errorHandler.start();
+ String line = null;
+ String hosts = null;
+ long startTimeValue = -1;
+ long endTimeValue = Calendar.getInstance().getTimeInMillis();
+ long executeTimeValue = Calendar.getInstance().getTimeInMillis();
+ boolean qstatfinished;
+
+ while ((line = result.readLine()) != null) {
+ if (line.indexOf("ctime") >= 0) {
+ String startTime = line.split("=")[1].trim();
+ // Tue Sep 9 23:44:29 2008
+ SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+ Date startTimeDate;
+ try {
+ startTimeDate = sdf.parse(startTime);
+ startTimeValue = startTimeDate.getTime();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ if (line.indexOf("mtime") >= 0) {
+ String endTime = line.split("=")[1].trim();
+ SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+ Date endTimeDate;
+ try {
+ endTimeDate = sdf.parse(endTime);
+ endTimeValue = endTimeDate.getTime();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ if (line.indexOf("etime") >= 0) {
+ String executeTime = line.split("=")[1].trim();
+ SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+ Date executeTimeDate;
+ try {
+ executeTimeDate = sdf.parse(executeTime);
+ executeTimeValue = executeTimeDate.getTime();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ if (line.indexOf("exec_host") >= 0) {
+ hosts = line.split("=")[1].trim();
+ }
+ }
+
+ if (hosts != null && startTimeValue >= 0) {
+ String[] items2 = hosts.split("[+]");
+ int num = 0;
+ for (int i = 0; i < items2.length; i++) {
+ String machinetmp = items2[i];
+ if (machinetmp.length() > 3) {
+ String machine = items2[i].substring(0, items2[i].length() - 2);
+ StringBuffer data = new StringBuffer();
+ data.append("HodId=").append(hodId);
+ data.append(", Machine=").append(machine);
+ if (domain != null) {
+ data.append(".").append(domain);
}
-
- try {
- errorHandler.join();
- }catch (InterruptedException ie){
- log.error(ie.getMessage());
+ log.info(data);
+ num++;
+ }
+ }
+ Timestamp startTimedb = new Timestamp(startTimeValue);
+ Timestamp endTimedb = new Timestamp(endTimeValue);
+ StringBuffer data = new StringBuffer();
+ long timeQueued = executeTimeValue - startTimeValue;
+ data.append("HodID=").append(hodId);
+ data.append(", UserId=").append(userId);
+ data.append(", StartTime=").append(startTimedb);
+ data.append(", TimeQueued=").append(timeQueued);
+ data.append(", NumOfMachines=").append(num);
+ data.append(", EndTime=").append(endTimedb);
+ log.info(data);
+ qstatfinished = true;
+
+ } else {
+
+ qstatfinished = false;
+ }
+
+ try {
+ errorHandler.join();
+ } catch (InterruptedException ie) {
+ log.error(ie.getMessage());
+ }
+ result.close();
+ timeout.cancel();
+
+ return qstatfinished;
+ }
+
+ private boolean loadTraceJobData(String hodId) throws IOException,
+ SQLException {
+ TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+ String userId = aJobData.get("userId");
+ String process = aJobData.get("process");
+
+ StringBuffer sb = new StringBuffer();
+ sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
+ String[] traceJobCommand = new String[3];
+ traceJobCommand[0] = "ssh";
+ traceJobCommand[1] = torqueServer;
+ traceJobCommand[2] = sb.toString();
+
+ String command = traceJobCommand[0] + " " + traceJobCommand[1] + " "
+ + traceJobCommand[2];
+ ProcessBuilder pb = new ProcessBuilder(traceJobCommand);
+
+ Process p = pb.start();
+
+ Timer timeout = new Timer();
+ TorqueTimerTask torqueTimerTask = new TorqueTimerTask(p, command);
+ timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval * 1000);
+
+ BufferedReader result = new BufferedReader(new InputStreamReader(p
+ .getInputStream()));
+ ErStreamHandler errorHandler = new ErStreamHandler(p.getErrorStream(),
+ command, false);
+ errorHandler.start();
+ String line = null;
+ String exit_status = null;
+ String hosts = null;
+ long timeQueued = -1;
+ long startTimeValue = -1;
+ long endTimeValue = -1;
+ boolean findResult = false;
+
+ while ((line = result.readLine()) != null && !findResult) {
+ if (line.indexOf("end") >= 0 && line.indexOf("Exit_status") >= 0
+ && line.indexOf("qtime") >= 0) {
+ TreeMap<String, String> jobData = new TreeMap<String, String>();
+ String[] items = line.split("\\s+");
+ for (int i = 0; i < items.length; i++) {
+ String[] items2 = items[i].split("=");
+ if (items2.length >= 2) {
+ jobData.put(items2[0], items2[1]);
}
- result.close();
- timeout.cancel();
-
- return qstatfinished;
- }
-
-
- private boolean loadTraceJobData(String hodId) throws IOException,SQLException{
- TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
- String userId=aJobData.get("userId");
- String process=aJobData.get("process");
-
- StringBuffer sb=new StringBuffer();
- sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
- String[] traceJobCommand=new String [3];
- traceJobCommand[0]="ssh";
- traceJobCommand[1]=torqueServer;
- traceJobCommand[2]=sb.toString();
-
- String command=traceJobCommand[0]+" "+traceJobCommand[1]+" "+traceJobCommand[2];
- ProcessBuilder pb= new ProcessBuilder(traceJobCommand);
-
- Process p=pb.start();
-
- Timer timeout=new Timer();
- TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
- timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
-
- BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
- ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
- errorHandler.start();
- String line=null;
- String exit_status=null;
- String hosts=null;
- long timeQueued=-1;
- long startTimeValue=-1;
- long endTimeValue=-1;
- boolean findResult=false;
-
-
- while((line=result.readLine())!=null&& ! findResult){
- if (line.indexOf("end")>=0 &&line.indexOf("Exit_status")>=0 && line.indexOf("qtime")>=0){
- TreeMap <String,String> jobData=new TreeMap <String,String>() ;
- String [] items=line.split("\\s+");
- for (int i=0;i<items.length; i++) {
- String [] items2 = items[i].split("=");
- if (items2.length>=2){
- jobData.put(items2[0], items2[1]);
- }
-
- }
- String startTime=jobData.get("ctime");
- startTimeValue=Long.valueOf(startTime);
- startTimeValue=startTimeValue-startTimeValue%(60);
- Timestamp startTimedb=new Timestamp(startTimeValue*1000);
-
- String queueTime=jobData.get("qtime");
- long queueTimeValue=Long.valueOf(queueTime);
-
- String sTime=jobData.get("start");
- long sTimeValue=Long.valueOf(sTime);
-
- timeQueued=sTimeValue-queueTimeValue;
-
- String endTime=jobData.get("end");
- endTimeValue=Long.valueOf(endTime);
- endTimeValue=endTimeValue-endTimeValue%(60);
- Timestamp endTimedb=new Timestamp(endTimeValue*1000);
-
- exit_status=jobData.get("Exit_status");
- hosts=jobData.get("exec_host");
- String [] items2=hosts.split("[+]");
- int num=0;
- for (int i=0;i<items2.length;i++) {
- String machinetemp=items2[i];
- if (machinetemp.length()>=3){
- String machine=items2[i].substring(0,items2[i].length()-2);
- StringBuffer data=new StringBuffer();
- data.append("HodId=").append(hodId);
- data.append(", Machine=").append(machine);
- if(domain!=null) {
- data.append(".").append(domain);
- }
- log.info(data.toString());
- num++;
- }
- }
-
- StringBuffer data=new StringBuffer();
- data.append("HodID=").append(hodId);
- data.append(", UserId=").append(userId);
- data.append(", Status=").append(exit_status);
- data.append(", TimeQueued=").append(timeQueued);
- data.append(", StartTime=").append(startTimedb);
- data.append(", EndTime=").append(endTimedb);
- data.append(", NumOfMachines=").append(num);
- log.info(data.toString());
- findResult=true;
- log.debug(" hod info for job "+hodId+" has been loaded ");
- }//if
-
- }//while
-
- try {
- errorHandler.join();
- }catch (InterruptedException ie){
- log.error(ie.getMessage());
- }
-
- timeout.cancel();
- boolean tracedone=false;
- if (!findResult){
-
- String traceCheckCount=aJobData.get("traceCheckCount");
- int traceCheckCountValue=Integer.valueOf(traceCheckCount);
- traceCheckCountValue=traceCheckCountValue+1;
- aJobData.put("traceCheckCount",String.valueOf(traceCheckCountValue));
-
-
- log.debug("did not find tracejob info for job "+hodId+", after "+traceCheckCountValue+" times checking");
- if (traceCheckCountValue>=2){
- tracedone= true;
+
+ }
+ String startTime = jobData.get("ctime");
+ startTimeValue = Long.valueOf(startTime);
+ startTimeValue = startTimeValue - startTimeValue % (60);
+ Timestamp startTimedb = new Timestamp(startTimeValue * 1000);
+
+ String queueTime = jobData.get("qtime");
+ long queueTimeValue = Long.valueOf(queueTime);
+
+ String sTime = jobData.get("start");
+ long sTimeValue = Long.valueOf(sTime);
+
+ timeQueued = sTimeValue - queueTimeValue;
+
+ String endTime = jobData.get("end");
+ endTimeValue = Long.valueOf(endTime);
+ endTimeValue = endTimeValue - endTimeValue % (60);
+ Timestamp endTimedb = new Timestamp(endTimeValue * 1000);
+
+ exit_status = jobData.get("Exit_status");
+ hosts = jobData.get("exec_host");
+ String[] items2 = hosts.split("[+]");
+ int num = 0;
+ for (int i = 0; i < items2.length; i++) {
+ String machinetemp = items2[i];
+ if (machinetemp.length() >= 3) {
+ String machine = items2[i].substring(0, items2[i].length() - 2);
+ StringBuffer data = new StringBuffer();
+ data.append("HodId=").append(hodId);
+ data.append(", Machine=").append(machine);
+ if (domain != null) {
+ data.append(".").append(domain);
}
+ log.info(data.toString());
+ num++;
+ }
}
- boolean finished=findResult|tracedone;
- return finished;
- }
-
-
-
- private void process_data() throws SQLException{
-
- long currentTime=System.currentTimeMillis();
- currentTime=currentTime-currentTime%(60*1000);
- Timestamp timestamp=new Timestamp(currentTime);
-
- Set<String> hodIds=currentHodJobs.keySet();
-
- Iterator<String> hodIdsIt=hodIds.iterator();
- while (hodIdsIt.hasNext()){
- String hodId=(String) hodIdsIt.next();
- TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
- String status=aJobData.get("status");
- String process=aJobData.get("process");
- if (process.equals("0") && (status.equals("R") ||status.equals("E"))){
- try {
- boolean result=loadQstatData(hodId);
- if (result){
- aJobData.put("process","1");
- currentHodJobs.put(hodId, aJobData);
- }
- }catch (IOException ioe){
- log.error("load qsat data Error:"+ioe.getMessage());
-
- }
- }
- if (! process.equals("2") && status.equals("C")){
- try {
- boolean result=loadTraceJobData(hodId);
-
- if (result){
- aJobData.put("process","2");
- currentHodJobs.put(hodId, aJobData);
- }
- }catch (IOException ioe){
- log.error("loadTraceJobData Error:"+ioe.getMessage());
- }
- }//if
-
-
- } //while
-
- }
-
- private void handle_jobData() throws SQLException{
- try {
- getHodJobInfo();
- }catch (IOException ex){
- log.error("getQueueInfo Error:"+ex.getMessage());
- return;
- }
- try {
- process_data();
- } catch (SQLException ex){
- log.error("process_data Error:"+ex.getMessage());
- throw ex;
- }
- }
-
- public void run_forever() throws SQLException{
- while(true){
- handle_jobData();
- try {
- log.debug("sleeping ...");
- Thread.sleep(this.intervalValue*1000);
- } catch (InterruptedException e) {
- log.error(e.getMessage());
- }
+
+ StringBuffer data = new StringBuffer();
+ data.append("HodID=").append(hodId);
+ data.append(", UserId=").append(userId);
+ data.append(", Status=").append(exit_status);
+ data.append(", TimeQueued=").append(timeQueued);
+ data.append(", StartTime=").append(startTimedb);
+ data.append(", EndTime=").append(endTimedb);
+ data.append(", NumOfMachines=").append(num);
+ log.info(data.toString());
+ findResult = true;
+ log.debug(" hod info for job " + hodId + " has been loaded ");
+ }// if
+
+ }// while
+
+ try {
+ errorHandler.join();
+ } catch (InterruptedException ie) {
+ log.error(ie.getMessage());
+ }
+
+ timeout.cancel();
+ boolean tracedone = false;
+ if (!findResult) {
+
+ String traceCheckCount = aJobData.get("traceCheckCount");
+ int traceCheckCountValue = Integer.valueOf(traceCheckCount);
+ traceCheckCountValue = traceCheckCountValue + 1;
+ aJobData.put("traceCheckCount", String.valueOf(traceCheckCountValue));
+
+ log.debug("did not find tracejob info for job " + hodId + ", after "
+ + traceCheckCountValue + " times checking");
+ if (traceCheckCountValue >= 2) {
+ tracedone = true;
+ }
+ }
+ boolean finished = findResult | tracedone;
+ return finished;
+ }
+
+ private void process_data() throws SQLException {
+
+ long currentTime = System.currentTimeMillis();
+ currentTime = currentTime - currentTime % (60 * 1000);
+ Timestamp timestamp = new Timestamp(currentTime);
+
+ Set<String> hodIds = currentHodJobs.keySet();
+
+ Iterator<String> hodIdsIt = hodIds.iterator();
+ while (hodIdsIt.hasNext()) {
+ String hodId = (String) hodIdsIt.next();
+ TreeMap<String, String> aJobData = currentHodJobs.get(hodId);
+ String status = aJobData.get("status");
+ String process = aJobData.get("process");
+ if (process.equals("0") && (status.equals("R") || status.equals("E"))) {
+ try {
+ boolean result = loadQstatData(hodId);
+ if (result) {
+ aJobData.put("process", "1");
+ currentHodJobs.put(hodId, aJobData);
}
- }
+ } catch (IOException ioe) {
+ log.error("load qsat data Error:" + ioe.getMessage());
+
+ }
+ }
+ if (!process.equals("2") && status.equals("C")) {
+ try {
+ boolean result = loadTraceJobData(hodId);
+
+ if (result) {
+ aJobData.put("process", "2");
+ currentHodJobs.put(hodId, aJobData);
+ }
+ } catch (IOException ioe) {
+ log.error("loadTraceJobData Error:" + ioe.getMessage());
+ }
+ }// if
+
+ } // while
+
+ }
+
+ private void handle_jobData() throws SQLException {
+ try {
+ getHodJobInfo();
+ } catch (IOException ex) {
+ log.error("getQueueInfo Error:" + ex.getMessage());
+ return;
+ }
+ try {
+ process_data();
+ } catch (SQLException ex) {
+ log.error("process_data Error:" + ex.getMessage());
+ throw ex;
+ }
+ }
+
+ public void run_forever() throws SQLException {
+ while (true) {
+ handle_jobData();
+ try {
+ log.debug("sleeping ...");
+ Thread.sleep(this.intervalValue * 1000);
+ } catch (InterruptedException e) {
+ log.error(e.getMessage());
+ }
+ }
+ }
- public void shutdown(){
- }
+ public void shutdown() {
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java Wed Mar 11 22:39:26 2009
@@ -17,34 +17,35 @@
*/
package org.apache.hadoop.chukwa.inputtools.mdl;
+
import java.util.TimerTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-public class TorqueTimerTask extends TimerTask{
- private Process ps=null;
- private String command;
-
- private static Log log = LogFactory.getLog(TorqueTimerTask.class);
- //public static int timeoutInterval=300;
- public static int timeoutInterval=180;
-
- public TorqueTimerTask() {
- super();
- // TODO Auto-generated constructor stub
- }
-
- public TorqueTimerTask(Process process,String command){
- super();
- this.ps=process;
- this.command=command;
-
- }
-
- public void run() {
- ps.destroy();
- log.error("torque command: "+command+" timed out");
-
- }
+public class TorqueTimerTask extends TimerTask {
+ private Process ps = null;
+ private String command;
+
+ private static Log log = LogFactory.getLog(TorqueTimerTask.class);
+ // public static int timeoutInterval=300;
+ public static int timeoutInterval = 180;
+
+ public TorqueTimerTask() {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+ public TorqueTimerTask(Process process, String command) {
+ super();
+ this.ps = process;
+ this.command = command;
+
+ }
+
+ public void run() {
+ ps.destroy();
+ log.error("torque command: " + command + " timed out");
+
+ }
}