You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/10/31 19:57:06 UTC
svn commit: r709533 [2/2] - in /hadoop/core/trunk: ./ src/contrib/chukwa/
src/contrib/chukwa/bin/ src/contrib/chukwa/conf/
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/...
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java?rev=709533&r1=709532&r2=709533&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java Fri Oct 31 11:57:04 2008
@@ -9,23 +9,23 @@
package org.apache.hadoop.chukwa.inputtools.log4j;
-import java.io.IOException;
import java.io.File;
+import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
-import java.util.Calendar;
-import java.util.TimeZone;
import java.util.Locale;
+import java.util.TimeZone;
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.chukwa.util.RecordConstants;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
-import org.apache.hadoop.chukwa.util.RecordConstants;
-import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
-
/**
ChukwaDailyRollingFileAppender is a slightly modified version of
DailyRollingFileAppender, with modified versions of its
@@ -129,14 +129,13 @@
<p>Do not use the colon ":" character in anywhere in the
<b>DatePattern</b> option. The text before the colon is interpeted
as the protocol specificaion of a URL which is probably not what
- you want.
+ you want.
+*/
- @author Eirik Lygre
- @author Ceki Gülcü */
public class ChukwaDailyRollingFileAppender extends FileAppender {
-
+ static Logger log = Logger.getLogger(ChukwaDailyRollingFileAppender.class);
// The code assumes that the following constants are in a increasing
// sequence.
static final int TOP_OF_TROUBLE=-1;
@@ -149,6 +148,9 @@
static final String adaptorType = ChukwaAgentController.CharFileTailUTF8NewLineEscaped;
+ static final Object lock = new Object();
+ static String lastRotation = "";
+
/**
The date pattern. By default, the pattern is set to
"'.'yyyy-MM-dd" meaning daily rollover.
@@ -180,6 +182,9 @@
int checkPeriod = TOP_OF_TROUBLE;
ChukwaAgentController chukwaClient;
+ boolean chukwaClientIsNull = true;
+ static final Object chukwaLock = new Object();
+
String chukwaClientHostname;
int chukwaClientPortNum;
long chukwaClientConnectNumRetry;
@@ -203,7 +208,7 @@
/**
Instantiate a <code>DailyRollingFileAppender</code> and open the
file designated by <code>filename</code>. The opened filename will
- become the ouput destination for this appender.
+ become the output destination for this appender.
*/
public ChukwaDailyRollingFileAppender (Layout layout, String filename,
@@ -336,12 +341,10 @@
return;
}
+
// close current file, and rename it to datedFilename
this.closeFile();
- if (chukwaClient != null){
- chukwaClient.pauseFile(getRecordType(),fileName);
- }
File target = new File(scheduledFilename);
if (target.exists()) {
@@ -363,19 +366,44 @@
}
catch(IOException e) {
errorHandler.error("setFile("+fileName+", false) call failed.");
- }
-
- //resume the adaptor for the file now that we have emptied it (i.e. rolled it over)
- if (chukwaClient.isFilePaused(getRecordType(), fileName)){
- chukwaClient.resumeFile(getRecordType(), fileName);
- }
- else {
- LogLog.warn("chukwa appender for file " + fileName + " was not paused, so we didn't do resumeFile() for it");
- }
-
+ }
scheduledFilename = datedFilename;
}
+
+ private class ClientFinalizer extends Thread
+ {
+ private ChukwaAgentController chukwaClient = null;
+ private String recordType = null;
+ private String fileName = null;
+ public ClientFinalizer(ChukwaAgentController chukwaClient,String recordType, String fileName)
+ {
+ this.chukwaClient = chukwaClient;
+ this.recordType = recordType;
+ this.fileName = fileName;
+ }
+ public synchronized void run()
+ {
+ try
+ {
+ if (chukwaClient != null)
+ {
+ log.debug("ShutdownHook: removing:" + fileName);
+ chukwaClient.removeFile(recordType, fileName);
+ }
+ else
+ {
+ LogLog.warn("chukwaClient is null cannot do any cleanup");
+ }
+ }
+ catch (Throwable e)
+ {
+ LogLog.warn("closing the controller threw an exception:\n" + e);
+ }
+ }
+ }
+ private ClientFinalizer clientFinalizer = null;
+
/**
* This method differentiates DailyRollingFileAppender from its
* super class.
@@ -384,64 +412,96 @@
* time to do a rollover. If it is, it will schedule the next
* rollover time and then rollover.
* */
- protected void subAppend(LoggingEvent event) {
- //we set up the chukwa adaptor here because this is the first
- //point which is called after all setters have been called with
- //their values from the log4j.properties file, in particular we
- //needed to give setCukwaClientPortNum() and -Hostname() a shot
- if (chukwaClient == null){
- if (getChukwaClientHostname() != null && getChukwaClientPortNum() != 0){
- chukwaClient = new ChukwaAgentController(getChukwaClientHostname(), getChukwaClientPortNum());
- System.out.println("setup adaptor with hostname " + getChukwaClientHostname() + " and portnum " + getChukwaClientPortNum());
- }
- else{
- chukwaClient = new ChukwaAgentController();
- System.out.println("setup adaptor with no args, which means it used its defaults");
- }
-
- //if they haven't specified, default to retrying every 10 seconds for 5 minutes
- long retryInterval = chukwaClientConnectRetryInterval;
- if (retryInterval == 0)
- retryInterval = 1000;
- long numRetries = chukwaClientConnectNumRetry;
- if (numRetries == 0)
- numRetries = 30;
- long adaptorID = chukwaClient.addFile(getRecordType(), getFile(), numRetries, retryInterval);
- if (adaptorID > 0){
- System.out.println("Added file tailing adaptor to chukwa agent for file " + getFile());
- }
- else{
- System.out.println("Chukwa adaptor not added, addFile(" + getFile() + ") returned " + adaptorID);
- }
- }
- long n = System.currentTimeMillis();
- if (n >= nextCheck) {
- now.setTime(n);
- nextCheck = rc.getNextCheckMillis(now);
- try {
- rollOver();
- }
- catch(IOException ioe) {
- LogLog.error("rollOver() failed.", ioe);
- }
- }
- //escape the newlines from record bodies and then write this record to the log file
- this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",this.layout.format(event)));
+ protected void subAppend(LoggingEvent event)
+ {
+ try
+ {
+ //we set up the chukwa adaptor here because this is the first
+ //point which is called after all setters have been called with
+ //their values from the log4j.properties file, in particular we
+ //needed to give setCukwaClientPortNum() and -Hostname() a shot
+
+ // Make sure only one thread can do this
+ // and use the boolean to avoid the first level locking
+ if (chukwaClientIsNull)
+ {
+ synchronized(chukwaLock)
+ {
+ if (chukwaClient == null){
+ if (getChukwaClientHostname() != null && getChukwaClientPortNum() != 0){
+ chukwaClient = new ChukwaAgentController(getChukwaClientHostname(), getChukwaClientPortNum());
+ log.debug("setup adaptor with hostname " + getChukwaClientHostname() + " and portnum " + getChukwaClientPortNum());
+ }
+ else{
+ chukwaClient = new ChukwaAgentController();
+ log.debug("setup adaptor with no args, which means it used its defaults");
+ }
+
+ chukwaClientIsNull = false;
+
+ //if they haven't specified, default to retrying every minute for 2 hours
+ long retryInterval = chukwaClientConnectRetryInterval;
+ if (retryInterval == 0)
+ retryInterval = 1000 * 60;
+ long numRetries = chukwaClientConnectNumRetry;
+ if (numRetries == 0)
+ numRetries = 120;
+ String log4jFileName = getFile();
+ String recordType = getRecordType();
+ long adaptorID = chukwaClient.addFile(recordType, log4jFileName, numRetries, retryInterval);
+
+ // Setup a shutdownHook for the controller
+ clientFinalizer = new ClientFinalizer(chukwaClient,recordType,log4jFileName);
+ Runtime.getRuntime().addShutdownHook(clientFinalizer);
+
+
+ if (adaptorID > 0){
+ log.debug("Added file tailing adaptor to chukwa agent for file " + log4jFileName + "using this recordType :" + recordType);
+ }
+ else{
+ log.debug("Chukwa adaptor not added, addFile(" + log4jFileName + ") returned " + adaptorID);
+ }
+
+ }
+ }
+ }
+
+
+ long n = System.currentTimeMillis();
+ if (n >= nextCheck) {
+ now.setTime(n);
+ nextCheck = rc.getNextCheckMillis(now);
+ try {
+ rollOver();
+ }
+ catch(IOException ioe) {
+ LogLog.error("rollOver() failed.", ioe);
+ }
+ }
+ //escape the newlines from record bodies and then write this record to the log file
+ this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",this.layout.format(event)));
+
+ if(layout.ignoresThrowable()) {
+ String[] s = event.getThrowableStrRep();
+ if (s != null) {
+ int len = s.length;
+ for(int i = 0; i < len; i++) {
+ this.qw.write(s[i]);
+ this.qw.write(Layout.LINE_SEP);
+ }
+ }
+ }
+
+ if(this.immediateFlush) {
+ this.qw.flush();
+ }
+ }
+ catch(Throwable e)
+ {
+ System.err.println("Exception in ChukwaRollingAppender: " + e.getMessage());
+ e.printStackTrace();
+ }
- if(layout.ignoresThrowable()) {
- String[] s = event.getThrowableStrRep();
- if (s != null) {
- int len = s.length;
- for(int i = 0; i < len; i++) {
- this.qw.write(s[i]);
- this.qw.write(Layout.LINE_SEP);
- }
- }
- }
-
- if(this.immediateFlush) {
- this.qw.flush();
- }
}
public String getChukwaClientHostname() {
@@ -479,7 +539,11 @@
* */
class RollingCalendar extends GregorianCalendar {
- int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2153481574198792767L;
+int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
RollingCalendar() {
super();
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.log4j;
+
+import java.io.*;
+
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsException;
+import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class Log4JMetricsContext extends AbstractMetricsContext {
+
+ Logger out = null; //Logger.getLogger(Log4JMetricsContext.class);
+
+ /* Configuration attribute names */
+// protected static final String FILE_NAME_PROPERTY = "fileName";
+ protected static final String PERIOD_PROPERTY = "period";
+
+
+ /** Creates a new instance of FileContext */
+ public Log4JMetricsContext() {}
+
+ public void init(String contextName, ContextFactory factory) {
+ super.init(contextName, factory);
+ /*
+ String fileName = getAttribute(FILE_NAME_PROPERTY);
+ if (fileName != null) {
+ file = new File(fileName);
+ }
+ */
+ out = Logger.getLogger("chukwa.hadoop.metrics."+contextName);
+ String periodStr = getAttribute(PERIOD_PROPERTY);
+ if (periodStr != null) {
+ int period = 0;
+ try {
+ period = Integer.parseInt(periodStr);
+ } catch (NumberFormatException nfe) {
+ }
+ if (period <= 0) {
+ throw new MetricsException("Invalid period: " + periodStr);
+ }
+ setPeriod(period);
+ }
+ }
+
+ @Override
+ protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
+ throws IOException
+ {
+ JSONObject json = new JSONObject();
+ try {
+ json.put("contextName", contextName);
+ json.put("recordName", recordName);
+ json.put("chukwa_timestamp", System.currentTimeMillis());
+ for (String tagName : outRec.getTagNames()) {
+ json.put(tagName, outRec.getTag(tagName));
+ }
+ for (String metricName : outRec.getMetricNames()) {
+ json.put(metricName, outRec.getMetric(metricName));
+ }
+ } catch (JSONException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ out.info(json.toString());
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.lang.Thread;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.lang.StringBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ErStreamHandler extends Thread{
+ InputStream inpStr;
+ String command;
+ boolean record;
+
+ private static Log log = LogFactory.getLog(ErStreamHandler.class);
+
+ public ErStreamHandler(InputStream inpStr,String command,boolean record){
+ this.inpStr=inpStr;
+ this.command=command;
+ this.record=record;
+
+ }
+
+ public void run(){
+ try {
+ InputStreamReader inpStrd=new InputStreamReader(inpStr);
+ BufferedReader buffRd=new BufferedReader(inpStrd);
+ String line=null;
+ StringBuffer sb=new StringBuffer();
+ while((line=buffRd.readLine())!= null){
+ sb.append(line);
+ }
+ buffRd.close();
+
+ if (record && sb.length()>0) {
+ log.error(command+" execution error:"+sb.toString());
+ }
+
+ }catch (Exception e){
+ log.error(command+" error:"+e.getMessage());
+ }
+ }
+
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.lang.Thread;
+import java.lang.management.ManagementFactory;
+import java.io.FileOutputStream;
+import java.sql.SQLException;
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.chukwa.inputtools.mdl.TorqueInfoProcessor;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.util.PidFile;
+
+public class TorqueDataLoader {
+ private static Log log = LogFactory.getLog("TorqueDataLoader");
+
+ private TorqueInfoProcessor tp=null;
+ private PidFile loader=null;
+
+
+ public TorqueDataLoader (DataConfig mdlConfig, int interval){
+ log.info("in torqueDataLoader");
+ tp = new TorqueInfoProcessor(mdlConfig, interval);
+ loader=new PidFile("TorqueDataLoader");
+ }
+
+
+ public void run(){
+ boolean first=true;
+ while(true){
+ try{
+ tp.setup(first);
+ first=false;
+ }catch (Exception ex){
+ tp.shutdown();
+
+ if (first){
+ log.error("setup error");
+ ex.printStackTrace();
+ loader.clean(); // only call before system.exit()
+ System.exit(1);
+ }
+ log.error("setup fail, retry after 10 minutes");
+ try {
+ Thread.sleep(600*1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ log.error(e.getMessage());
+ // e.printStackTrace();
+ }
+ continue;
+
+ }
+
+ try{
+ tp.run_forever();
+ }catch (SQLException ex) {
+ tp.shutdown();
+ log.error("processor died, reconnect again after 10 minutes");
+ ex.printStackTrace();
+ try {
+ Thread.sleep(600*1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ log.error(e.getMessage());
+ // e.printStackTrace();
+ }
+ }catch (Exception ex){
+ try {
+ Thread.sleep(16*1000);
+ } catch (InterruptedException e) {
+ ;
+ }
+ tp.shutdown();
+ log.error("process died...."+ex.getMessage());
+ loader.clean();
+ System.exit(1);
+ }
+
+ }//while
+
+ }
+
+
+ public static void main(String[] args) {
+ /* if (args.length < 2 || args[0].startsWith("-h")
+ || args[0].startsWith("--h")) {
+ System.out.println("Usage: UtilDataLoader interval(sec)");
+ System.exit(1);puvw-./chij
+ }
+ String interval = args[0];
+ int intervalValue=Integer.parseInt(interval);
+ */
+ int intervalValue=60;
+
+
+ DataConfig mdlConfig=new DataConfig();
+
+ TorqueDataLoader tdl = new TorqueDataLoader(mdlConfig, intervalValue);
+ tdl.run();
+
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.sql.SQLException;
+import java.sql.ResultSet;
+import java.lang.Exception;
+import java.util.Calendar;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.TreeMap;
+import java.util.Iterator;
+import java.lang.StringBuffer;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.lang.Thread;
+import java.util.Timer;
+import java.lang.ProcessBuilder;
+import java.lang.Process;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.InterruptedException;
+import java.lang.System;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.inputtools.mdl.TorqueTimerTask;
+import org.apache.hadoop.chukwa.inputtools.mdl.ErStreamHandler;
+import org.apache.hadoop.chukwa.util.DatabaseWriter;
+
+
+public class TorqueInfoProcessor {
+
+ private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
+
+ private int intervalValue=60;
+ private String torqueServer = null;
+ private String torqueBinDir= null;
+ private String domain = null;
+
+ private TreeMap <String,TreeMap<String,String>> currentHodJobs;
+
+
+ public TorqueInfoProcessor(DataConfig mdlConfig, int interval){
+ this.intervalValue=interval;
+
+ torqueServer=System.getProperty("TORQUE_SERVER");
+ torqueBinDir=System.getProperty("TORQUE_HOME")+File.separator+"bin";
+ domain=System.getProperty("DOMAIN");
+ currentHodJobs=new TreeMap<String,TreeMap<String,String>>();
+ }
+
+
+
+ public void setup(boolean recover)throws Exception {
+ }
+
+ private void getHodJobInfo() throws IOException {
+ StringBuffer sb=new StringBuffer();
+ sb.append(torqueBinDir).append("/qstat -a");
+
+ String[] getQueueInfoCommand=new String [3];
+ getQueueInfoCommand[0]="ssh";
+ getQueueInfoCommand[1]=torqueServer;
+ getQueueInfoCommand[2]=sb.toString();
+
+
+ String command=getQueueInfoCommand[0]+" "+getQueueInfoCommand[1]+" "+getQueueInfoCommand[2];
+ ProcessBuilder pb= new ProcessBuilder(getQueueInfoCommand);
+
+ Process p=pb.start();
+
+ Timer timeout=new Timer();
+ TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
+ timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
+
+ BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
+ ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,true);
+ errorHandler.start();
+
+ String line = null;
+ boolean start=false;
+ TreeSet<String> jobsInTorque=new TreeSet<String>();
+ while((line=result.readLine())!=null){
+ if (line.startsWith("---")){
+ start=true;
+ continue;
+ }
+
+ if(start){
+ String [] items=line.split("\\s+");
+ if (items.length>=10){
+ String hodIdLong=items[0];
+ String hodId=hodIdLong.split("[.]")[0];
+ String userId=items[1];
+ String numOfMachine=items[5];
+ String status=items[9];
+ jobsInTorque.add(hodId);
+ if (!currentHodJobs.containsKey(hodId)) {
+ TreeMap <String,String> aJobData=new TreeMap <String,String>();
+
+ aJobData.put("userId", userId);
+ aJobData.put("numOfMachine",numOfMachine);
+ aJobData.put("traceCheckCount","0");
+ aJobData.put("process", "0");
+ aJobData.put("status",status);
+ currentHodJobs.put(hodId,aJobData);
+ }else {
+ TreeMap <String,String> aJobData= currentHodJobs.get(hodId);
+ aJobData.put("status", status);
+ currentHodJobs.put(hodId,aJobData);
+ }//if..else
+ }
+ }
+ }//while
+
+ try {
+ errorHandler.join();
+ }catch (InterruptedException ie){
+ log.error(ie.getMessage());
+ }
+ timeout.cancel();
+
+ Set<String> currentHodJobIds=currentHodJobs.keySet();
+ Iterator<String> currentHodJobIdsIt=currentHodJobIds.iterator();
+ TreeSet<String> finishedHodIds=new TreeSet<String>();
+ while (currentHodJobIdsIt.hasNext()){
+ String hodId=currentHodJobIdsIt.next();
+ if (!jobsInTorque.contains(hodId)) {
+ TreeMap <String,String> aJobData=currentHodJobs.get(hodId);
+ String process=aJobData.get("process");
+ if (process.equals("0") || process.equals("1")) {
+ aJobData.put("status", "C");
+ }else {
+ finishedHodIds.add(hodId);
+ }
+ }
+ }//while
+
+ Iterator<String >finishedHodIdsIt=finishedHodIds.iterator();
+ while (finishedHodIdsIt.hasNext()){
+ String hodId=finishedHodIdsIt.next();
+ currentHodJobs.remove(hodId);
+ }
+
+ }
+
+ private boolean loadQstatData(String hodId) throws IOException, SQLException {
+ TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
+ String userId=aJobData.get("userId");
+
+ StringBuffer sb=new StringBuffer();
+ sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
+ String[] qstatCommand=new String [3];
+ qstatCommand[0]="ssh";
+ qstatCommand[1]=torqueServer;
+ qstatCommand[2]=sb.toString();
+
+ String command=qstatCommand[0]+" "+qstatCommand[1]+" "+qstatCommand[2];
+ ProcessBuilder pb= new ProcessBuilder(qstatCommand);
+ Process p=pb.start();
+
+ Timer timeout=new Timer();
+ TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
+ timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
+
+ BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
+ ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
+ errorHandler.start();
+ String line=null;
+ String hosts=null;
+ long startTimeValue=-1;
+ long endTimeValue=Calendar.getInstance().getTimeInMillis();
+ long executeTimeValue=Calendar.getInstance().getTimeInMillis();
+ boolean qstatfinished;
+
+ while((line=result.readLine())!=null){
+ if (line.indexOf("ctime")>=0){
+ String startTime=line.split("=")[1].trim();
+ //Tue Sep 9 23:44:29 2008
+ SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+ Date startTimeDate;
+ try {
+ startTimeDate = sdf.parse(startTime);
+ startTimeValue=startTimeDate.getTime();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ if (line.indexOf("mtime")>=0){
+ String endTime=line.split("=")[1].trim();
+ SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+ Date endTimeDate;
+ try {
+ endTimeDate = sdf.parse(endTime);
+ endTimeValue=endTimeDate.getTime();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ if (line.indexOf("etime")>=0){
+ String executeTime=line.split("=")[1].trim();
+ SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+ Date executeTimeDate;
+ try {
+ executeTimeDate = sdf.parse(executeTime);
+ executeTimeValue=executeTimeDate.getTime();
+ } catch (ParseException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ }
+ if (line.indexOf("exec_host")>=0){
+ hosts=line.split("=")[1].trim();
+ }
+ }
+
+ if (hosts!=null && startTimeValue>=0) {
+ String [] items2=hosts.split("[+]");
+ int num=0;
+ for (int i=0;i<items2.length;i++) {
+ String machinetmp=items2[i];
+ if( machinetmp.length()>3){
+ String machine=items2[i].substring(0,items2[i].length()-2);
+ StringBuffer data=new StringBuffer();
+ data.append("HodId=").append(hodId);
+ data.append(", Machine=").append(machine);
+ if(domain!=null) {
+ data.append(".").append(domain);
+ }
+ log.info(data);
+ num++;
+ }
+ }
+ Timestamp startTimedb=new Timestamp(startTimeValue);
+ Timestamp endTimedb=new Timestamp(endTimeValue);
+ StringBuffer data=new StringBuffer();
+ long timeQueued=executeTimeValue-startTimeValue;
+ data.append("HodID=").append(hodId);
+ data.append(", UserId=").append(userId);
+ data.append(", StartTime=").append(startTimedb);
+ data.append(", TimeQueued=").append(timeQueued);
+ data.append(", NumOfMachines=").append(num);
+ data.append(", EndTime=").append(endTimedb);
+ //log.info(data);
+ qstatfinished=true;
+
+ } else{
+
+ qstatfinished=false;
+ }
+
+ try {
+ errorHandler.join();
+ }catch (InterruptedException ie){
+ log.error(ie.getMessage());
+ }
+ result.close();
+ timeout.cancel();
+
+ return qstatfinished;
+ }
+
+
+ private boolean loadTraceJobData(String hodId) throws IOException,SQLException{
+ TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
+ //String queue=aJobData.get("queue");
+ String userId=aJobData.get("userId");
+ String process=aJobData.get("process");
+ //String numOfMachine=aJobData.get("numOfMachine");
+
+ //StringBuffer traceJobsb=new StringBuffer();
+ StringBuffer sb=new StringBuffer();
+ sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
+ //ProcessBuilder pb= new ProcessBuilder(getQueueInfoCommand.toString());
+ String[] traceJobCommand=new String [3];
+ traceJobCommand[0]="ssh";
+ traceJobCommand[1]=torqueServer;
+ traceJobCommand[2]=sb.toString();
+
+ String command=traceJobCommand[0]+" "+traceJobCommand[1]+" "+traceJobCommand[2];
+ //System.out.println(command);
+ ProcessBuilder pb= new ProcessBuilder(traceJobCommand);
+
+ //String testCommand="/home/lyyang/work/chukwa/src/java/org/apache/hadoop/chukwa/ikit/sleeping";
+ //ProcessBuilder pb= new ProcessBuilder(testCommand);
+ //pb.redirectErrorStream(false);
+
+ Process p=pb.start();
+
+ Timer timeout=new Timer();
+ TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
+ timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
+
+ BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
+ ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
+ errorHandler.start();
+ String line=null;
+ /*
+ BufferedReader error = new BufferedReader (new InputStreamReader(p.getErrorStream()));
+ String line = null;
+ boolean start=false;
+ TreeSet<String> jobsInTorque=new TreeSet<String>();
+ String errorLine = null;;
+ while((errorLine=error.readLine())!=null) {
+ //discard the error message;
+ ;
+ }
+ */
+ String exit_status=null;
+ String hosts=null;
+ long timeQueued=-1;
+ long startTimeValue=-1;
+ long endTimeValue=-1;
+ boolean findResult=false;
+
+
+ while((line=result.readLine())!=null&& ! findResult){
+ if (line.indexOf("end")>=0 &&line.indexOf("Exit_status")>=0 && line.indexOf("qtime")>=0){
+ TreeMap <String,String> jobData=new TreeMap <String,String>() ;
+ String [] items=line.split("\\s+");
+ for (int i=0;i<items.length; i++) {
+ String [] items2 = items[i].split("=");
+ if (items2.length>=2){
+ jobData.put(items2[0], items2[1]);
+ }
+
+ }
+ String startTime=jobData.get("ctime");
+ startTimeValue=Long.valueOf(startTime);
+ startTimeValue=startTimeValue-startTimeValue%(60);
+ Timestamp startTimedb=new Timestamp(startTimeValue*1000);
+
+ String queueTime=jobData.get("qtime");
+ long queueTimeValue=Long.valueOf(queueTime);
+
+ String sTime=jobData.get("start");
+ long sTimeValue=Long.valueOf(sTime);
+
+ timeQueued=sTimeValue-queueTimeValue;
+
+ String endTime=jobData.get("end");
+ endTimeValue=Long.valueOf(endTime);
+ endTimeValue=endTimeValue-endTimeValue%(60);
+ Timestamp endTimedb=new Timestamp(endTimeValue*1000);
+
+ exit_status=jobData.get("Exit_status");
+ //if (process.equals("0")){
+ hosts=jobData.get("exec_host");
+ String [] items2=hosts.split("[+]");
+ int num=0;
+ for (int i=0;i<items2.length;i++) {
+ String machinetemp=items2[i];
+ if (machinetemp.length()>=3){
+
+ String machine=items2[i].substring(0,items2[i].length()-2);
+ StringBuffer data=new StringBuffer();
+ data.append("HodId=").append(hodId);
+ data.append(", Machine=").append(machine);
+ if(domain!=null) {
+ data.append(".").append(domain);
+ }
+ log.info(data.toString());
+ num++;
+ }
+ }
+
+ StringBuffer data=new StringBuffer();
+ data.append("HodID=").append(hodId);
+ data.append(", UserId=").append(userId);
+ data.append(", Status=").append(exit_status);
+ data.append(", TimeQueued=").append(timeQueued);
+ data.append(", StartTime=").append(startTimedb);
+ data.append(", EndTime=").append(endTimedb);
+ data.append(", NumOfMachines=").append(num);
+ log.info(data.toString());
+// } else{
+// StringBuffer data=new StringBuffer();
+// data.append("HodID=").append(hodId);
+// data.append(", TimeQueued=").append(timeQueued);
+// data.append(", EndTime=").append(endTimedb);
+// data.append(", Status=").append(exit_status);
+// log.info(data.toString());
+// }
+ findResult=true;
+ log.debug(" hod info for job "+hodId+" has been loaded ");
+ }//if
+
+ }//while
+
+ try {
+ errorHandler.join();
+ }catch (InterruptedException ie){
+ log.error(ie.getMessage());
+ }
+
+ timeout.cancel();
+ boolean tracedone=false;
+ if (!findResult){
+
+ String traceCheckCount=aJobData.get("traceCheckCount");
+ int traceCheckCountValue=Integer.valueOf(traceCheckCount);
+ traceCheckCountValue=traceCheckCountValue+1;
+ aJobData.put("traceCheckCount",String.valueOf(traceCheckCountValue));
+
+
+ log.debug("did not find tracejob info for job "+hodId+", after "+traceCheckCountValue+" times checking");
+ if (traceCheckCountValue>=2){
+ tracedone= true;
+
+// StringBuffer deletesb1=new StringBuffer();
+// deletesb1.append(" Delete from ").append(hodJobTable);
+// deletesb1.append(" where hodid='").append(hodId).append("'");
+// String delete1=deletesb1.toString();
+//
+//// dbWriter.execute(delete1);
+//
+// StringBuffer deletesb2=new StringBuffer();
+// deletesb2.append(" Delete from ").append(hodMachineTable);
+// deletesb2.append(" where hodid='").append(hodId).append("'");
+// String delete2=deletesb2.toString();
+//// dbWriter.execute(delete2);
+ }
+ }
+ boolean finished=findResult|tracedone;
+
+
+ return finished;
+
+ // return true;
+ }
+
+
+
+ private void process_data() throws SQLException{
+
+ long currentTime=System.currentTimeMillis();
+ currentTime=currentTime-currentTime%(60*1000);
+ Timestamp timestamp=new Timestamp(currentTime);
+
+ Set<String> hodIds=currentHodJobs.keySet();
+
+ Iterator<String> hodIdsIt=hodIds.iterator();
+ while (hodIdsIt.hasNext()){
+ String hodId=(String) hodIdsIt.next();
+ TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
+ //String queue=aJobData.get("queue");
+ //String numOfMachine=aJobData.get("numOfMachine");
+ String status=aJobData.get("status");
+ String process=aJobData.get("process");
+ if (process.equals("0") && (status.equals("R") ||status.equals("E"))){
+ try {
+ boolean result=loadQstatData(hodId);
+ if (result){
+ aJobData.put("process","1");
+ currentHodJobs.put(hodId, aJobData);
+ }
+ }catch (IOException ioe){
+ log.error("load qsat data Error:"+ioe.getMessage());
+
+ }
+ }
+ if (! process.equals("2") && status.equals("C")){
+ try {
+ boolean result=loadTraceJobData(hodId);
+
+ if (result){
+ aJobData.put("process","2");
+ currentHodJobs.put(hodId, aJobData);
+ }
+ }catch (IOException ioe){
+ log.error("loadTraceJobData Error:"+ioe.getMessage());
+ }
+ }//if
+
+
+ } //while
+
+ }
+
+
+ private void handle_jobData() throws SQLException{
+ try{
+ getHodJobInfo();
+ }catch (IOException ex){
+ log.error("getQueueInfo Error:"+ex.getMessage());
+ return;
+ }
+ try{
+ process_data();
+ } catch (SQLException ex){
+ log.error("process_data Error:"+ex.getMessage());
+ throw ex;
+ }
+ }
+
+
+ public void run_forever() throws SQLException{
+ while(true){
+ handle_jobData();
+ try {
+ log.debug("sleeping ...");
+ Thread.sleep(this.intervalValue*1000);
+ } catch (InterruptedException e) {
+ log.error(e.getMessage());
+ }
+ }
+ }
+
+
+ public void shutdown(){
+ }
+
+
+
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.util.TimerTask;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TorqueTimerTask extends TimerTask{
+ private Process ps=null;
+ private String command;
+
+ private static Log log = LogFactory.getLog(TorqueTimerTask.class);
+ //public static int timeoutInterval=300;
+ public static int timeoutInterval=180;
+
+ public TorqueTimerTask() {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+ public TorqueTimerTask(Process process,String command){
+ super();
+ this.ps=process;
+ this.command=command;
+
+ }
+
+ public void run() {
+ ps.destroy();
+ log.error("torque command: "+command+" timed out");
+
+ }
+
+}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java?rev=709533&r1=709532&r2=709533&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java Fri Oct 31 11:57:04 2008
@@ -1,47 +1,67 @@
package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Timer;
+import java.util.TimerTask;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
+import org.apache.hadoop.chukwa.util.PidFile;
import org.json.JSONException;
import org.json.JSONObject;
-public class Exec extends ExecPlugin
-{
+public class Exec extends TimerTask {
private static Log log = LogFactory.getLog(Exec.class);
private String cmde = null;
-
- public Exec(String[] cmds)
- {
+ private static PidFile pFile = null;
+ private Timer timer = null;
+ private IPlugin plugin = null;
+
+ public Exec(String[] cmds) {
StringBuffer c = new StringBuffer();
for(String cmd : cmds) {
c.append(cmd);
c.append(" ");
}
cmde = c.toString();
+ plugin = new ExecHelper(cmds);
+ }
+ public void run() {
+ try {
+ JSONObject result = plugin.execute();
+ if (result.getInt("status") < 0) {
+ System.out.println("Error");
+ log.warn("[ChukwaError]:"+ Exec.class + ", " + result.getString("stderr"));
+ System.exit(-1);
+ } else {
+ log.info(result.get("stdout"));
+ }
+ } catch(JSONException e) {
+ log.error("Exec output unparsable:"+this.cmde);
+ }
}
-
- @Override
- public String getCmde()
- {
+ public String getCmde() {
return cmde;
}
-
- public static void main(String[] args) throws JSONException
- {
- IPlugin plugin = new Exec(args);
- JSONObject result = plugin.execute();
- if (result.getInt("status") < 0)
- {
- System.out.println("Error");
- log.warn("[ChukwaError]:"+ Exec.class + ", " + result.getString("stderr"));
- System.exit(-1);
- }
- else
- {
- log.info(result.get("stdout"));
- }
- System.exit(0);
+
+ public static void main(String[] args) {
+ pFile=new PidFile(System.getProperty("RECORD_TYPE")+"-data-loader");
+ Runtime.getRuntime().addShutdownHook(pFile);
+ int period = 60;
+ try {
+ if(System.getProperty("PERIOD")!=null) {
+ period = Integer.parseInt(System.getProperty("PERIOD"));
+ }
+ } catch(NumberFormatException ex) {
+ ex.printStackTrace();
+ System.out.println("Usage: java -DPERIOD=nn -DRECORD_TYPE=recordType Exec [cmd]");
+ System.out.println("PERIOD should be numeric format of seconds.");
+ System.exit(0);
+ }
+ Timer timer = new Timer();
+ timer.schedule(new Exec(args),0, period*1000);
}
}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,34 @@
+package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
+import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
+import org.apache.hadoop.chukwa.util.PidFile;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class ExecHelper extends ExecPlugin {
+ private static Log log = LogFactory.getLog(ExecHelper.class);
+ private String cmde = null;
+ private static PidFile pFile = null;
+ private Timer timer = null;
+
+ public ExecHelper(String[] cmds) {
+ StringBuffer c = new StringBuffer();
+ for(String cmd : cmds) {
+ c.append(cmd);
+ c.append(" ");
+ }
+ cmde = c.toString();
+ }
+
+ public String getCmde() {
+ return cmde;
+ }
+}
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java?rev=709533&r1=709532&r2=709533&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java Fri Oct 31 11:57:04 2008
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
-
-import java.io.*;
-
-import org.apache.hadoop.metrics.ContextFactory;
-import org.apache.hadoop.metrics.MetricsException;
-import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
-import org.apache.hadoop.metrics.spi.OutputRecord;
-import org.apache.log4j.Logger;
-
-public class Log4JMetricsContext extends AbstractMetricsContext {
-
- static Logger out = Logger.getLogger(Log4JMetricsContext.class);
-
- /* Configuration attribute names */
-// protected static final String FILE_NAME_PROPERTY = "fileName";
- protected static final String PERIOD_PROPERTY = "period";
-
-
- /** Creates a new instance of FileContext */
- public Log4JMetricsContext() {}
-
- public void init(String contextName, ContextFactory factory) {
- super.init(contextName, factory);
- /*
- String fileName = getAttribute(FILE_NAME_PROPERTY);
- if (fileName != null) {
- file = new File(fileName);
- }
- */
- String periodStr = getAttribute(PERIOD_PROPERTY);
- if (periodStr != null) {
- int period = 0;
- try {
- period = Integer.parseInt(periodStr);
- } catch (NumberFormatException nfe) {
- }
- if (period <= 0) {
- throw new MetricsException("Invalid period: " + periodStr);
- }
- setPeriod(period);
- }
- }
-
- @Override
- protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
- throws IOException
- {
- StringBuilder writer = new StringBuilder();
- String separator = " ";
- writer.append("contextName=");
- writer.append(contextName);
-
- writer.append(separator);
- writer.append("recordName=");
- writer.append(recordName);
-
-
- writer.append(separator);
- writer.append("chukwa_timestamp="+ System.currentTimeMillis());
- writer.append(recordName);
-
- for (String tagName : outRec.getTagNames()) {
- writer.append(separator);
- writer.append(tagName);
- writer.append("=");
- writer.append(outRec.getTag(tagName));
- }
- for (String metricName : outRec.getMetricNames()) {
- writer.append(separator);
- writer.append(metricName);
- writer.append("=");
- writer.append(outRec.getMetric(metricName));
- }
-
- out.info(writer.toString());
-// out.println(writer);
- }
-
-}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.*;
+import java.util.*;
+
+public class ClusterConfig {
+ public static HashMap<String, String> clusterMap = new HashMap<String, String>();
+ private String path=System.getenv("CHUKWA_CONF_DIR")+File.separator;
+ static public String getContents(File aFile) {
+ //...checks on aFile are elided
+ StringBuffer contents = new StringBuffer();
+
+ try {
+ //use buffering, reading one line at a time
+ //FileReader always assumes default encoding is OK!
+ BufferedReader input = new BufferedReader(new FileReader(aFile));
+ try {
+ String line = null; //not declared within while loop
+ /*
+ * readLine is a bit quirky :
+ * it returns the content of a line MINUS the newline.
+ * it returns null only for the END of the stream.
+ * it returns an empty String if two newlines appear in a row.
+ */
+ while (( line = input.readLine()) != null){
+ contents.append(line);
+ contents.append(System.getProperty("line.separator"));
+ }
+ } finally {
+ input.close();
+ }
+ }
+ catch (IOException ex){
+ ex.printStackTrace();
+ }
+
+ return contents.toString();
+ }
+
+ public ClusterConfig() {
+ File cc = new File(path+"jdbc.conf");
+ String buffer = getContents(cc);
+ String[] lines = buffer.split("\n");
+ for(String line: lines) {
+ String[] data = line.split("=",2);
+ clusterMap.put(data[0],data[1]);
+ }
+ }
+
+ public String getURL(String cluster) {
+ String url = clusterMap.get(cluster);
+ return url;
+ }
+
+ public Iterator<String> getClusters() {
+ Set<String> keys = clusterMap.keySet();
+ Iterator<String> i = keys.iterator();
+ return i;
+ }
+}
Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.nio.channels.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class PidFile extends Thread {
+
+ String name;
+ private static Log log = LogFactory.getLog(PidFile.class);
+ private static FileLock lock = null;
+ private static FileOutputStream pidFileOutput = null;
+
+ public PidFile(String name){
+ this.name=name;
+ try {
+ init();
+ } catch(IOException ex) {
+ clean();
+ System.exit(-1);
+ }
+ }
+
+ public void init() throws IOException{
+ String pidLong=ManagementFactory.getRuntimeMXBean().getName();
+ String[] items=pidLong.split("@");
+ String pid=items[0];
+ String chukwaPath=System.getProperty("CHUKWA_HOME");
+ StringBuffer pidFilesb=new StringBuffer();
+ String pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
+ pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
+ try{
+ File existsFile = new File(pidDir);
+ if(!existsFile.exists()) {
+ boolean success = (new File(pidDir)).mkdirs();
+ if(!success) {
+ throw(new IOException());
+ }
+ }
+ File pidFile= new File(pidFilesb.toString());
+
+ pidFileOutput= new FileOutputStream(pidFile);
+ pidFileOutput.write(pid.getBytes());
+ pidFileOutput.flush();
+ FileChannel channel = pidFileOutput.getChannel();
+ PidFile.lock = channel.tryLock();
+ if(PidFile.lock!=null) {
+ log.debug("Initlization succeeded...");
+ } else {
+ throw(new IOException());
+ }
+ }catch (IOException ex){
+ System.out.println("Initializaiton failed: can not write pid file.");
+ log.error("Initialization failed...");
+ log.error(ex.getMessage());
+ System.exit(-1);
+ throw ex;
+
+ }
+
+ }
+
+ public void clean(){
+ String chukwaPath=System.getenv("CHUKWA_HOME");
+ StringBuffer pidFilesb=new StringBuffer();
+ pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid");
+ String pidFileName=pidFilesb.toString();
+
+ File pidFile=new File(pidFileName);
+ if (!pidFile.exists()) {
+ log.error("Delete pid file, No such file or directory: "+pidFileName);
+ } else {
+ try {
+ lock.release();
+ pidFileOutput.close();
+ } catch(IOException e) {
+ log.error("Unable to release file lock: "+pidFileName);
+ }
+ }
+
+ boolean result=pidFile.delete();
+ if (!result){
+ log.error("Delete pid file failed, "+pidFileName);
+ }
+ }
+
+ public void run() {
+ clean();
+ }
+}