You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/10/31 19:57:06 UTC

svn commit: r709533 [2/2] - in /hadoop/core/trunk: ./ src/contrib/chukwa/ src/contrib/chukwa/bin/ src/contrib/chukwa/conf/ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/ src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/...

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java?rev=709533&r1=709532&r2=709533&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/ChukwaDailyRollingFileAppender.java Fri Oct 31 11:57:04 2008
@@ -9,23 +9,23 @@
 
 package org.apache.hadoop.chukwa.inputtools.log4j;
 
-import java.io.IOException;
 import java.io.File;
+import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.Calendar;
 import java.util.Date;
 import java.util.GregorianCalendar;
-import java.util.Calendar;
-import java.util.TimeZone;
 import java.util.Locale;
+import java.util.TimeZone;
 
+import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
+import org.apache.hadoop.chukwa.util.RecordConstants;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Layout;
+import org.apache.log4j.Logger;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
 
-import org.apache.hadoop.chukwa.util.RecordConstants;
-import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
-
 /**
     ChukwaDailyRollingFileAppender is a slightly modified version of
     DailyRollingFileAppender, with modified versions of its
@@ -129,14 +129,13 @@
     <p>Do not use the colon ":" character in anywhere in the
     <b>DatePattern</b> option. The text before the colon is interpeted
     as the protocol specificaion of a URL which is probably not what
-    you want.
+    you want. 
 
+*/
 
-    @author Eirik Lygre
-    @author Ceki G&uuml;lc&uuml; */
 public class ChukwaDailyRollingFileAppender extends FileAppender {
 
-
+	static Logger log = Logger.getLogger(ChukwaDailyRollingFileAppender.class);
   // The code assumes that the following constants are in a increasing
   // sequence.
   static final int TOP_OF_TROUBLE=-1;
@@ -149,6 +148,9 @@
 
   static final String adaptorType = ChukwaAgentController.CharFileTailUTF8NewLineEscaped;
 
+  static final Object lock = new Object();
+  static String lastRotation = "";
+  
   /**
     The date pattern. By default, the pattern is set to
     "'.'yyyy-MM-dd" meaning daily rollover.
@@ -180,6 +182,9 @@
   int checkPeriod = TOP_OF_TROUBLE;
 
   ChukwaAgentController chukwaClient;
+  boolean chukwaClientIsNull = true;
+  static final Object chukwaLock = new Object();
+  
   String chukwaClientHostname;
   int chukwaClientPortNum;
   long chukwaClientConnectNumRetry;
@@ -203,7 +208,7 @@
   /**
      Instantiate a <code>DailyRollingFileAppender</code> and open the
      file designated by <code>filename</code>. The opened filename will
-     become the ouput destination for this appender.
+     become the output destination for this appender.
 
    */
   public ChukwaDailyRollingFileAppender (Layout layout, String filename,
@@ -336,12 +341,10 @@
       return;
     }
 
+	
     // close current file, and rename it to datedFilename
     this.closeFile();
 
-    if (chukwaClient != null){
-      chukwaClient.pauseFile(getRecordType(),fileName);
-    }
 
     File target  = new File(scheduledFilename);
     if (target.exists()) {
@@ -363,19 +366,44 @@
     }
     catch(IOException e) {
       errorHandler.error("setFile("+fileName+", false) call failed.");
-    }
-
-    //resume the adaptor for the file now that we have emptied it (i.e. rolled it over)
-    if (chukwaClient.isFilePaused(getRecordType(), fileName)){
-      chukwaClient.resumeFile(getRecordType(), fileName);
-    }
-    else {
-      LogLog.warn("chukwa appender for file " + fileName + " was not paused, so we didn't do resumeFile() for it");
-    }
-    
+    }    
     scheduledFilename = datedFilename;
   }
 
+  
+  private class ClientFinalizer extends Thread 
+  {
+	  private ChukwaAgentController chukwaClient = null;
+	  private String recordType = null;
+	  private String fileName = null;
+	  public ClientFinalizer(ChukwaAgentController chukwaClient,String recordType, String fileName)
+	  {
+		  this.chukwaClient = chukwaClient;
+		  this.recordType = recordType;
+		  this.fileName = fileName;
+	  }
+	    public synchronized void run() 
+	    {
+	      try 
+	      {
+	    	  if (chukwaClient != null)
+	    	  {
+	    		  log.debug("ShutdownHook: removing:" + fileName);
+	    		  chukwaClient.removeFile(recordType, fileName);
+	    	  }
+	    	  else
+	    	  {
+	    		  LogLog.warn("chukwaClient is null cannot do any cleanup");
+	    	  }
+	      } 
+	      catch (Throwable e) 
+	      {
+	    	  LogLog.warn("closing the controller threw an exception:\n" + e);
+	      }
+	    }
+	  }
+	  private ClientFinalizer clientFinalizer = null;
+  
   /**
    * This method differentiates DailyRollingFileAppender from its
    * super class.
@@ -384,64 +412,96 @@
    * time to do a rollover. If it is, it will schedule the next
    * rollover time and then rollover.
    * */
-  protected void subAppend(LoggingEvent event) {
-    //we set up the chukwa adaptor here because this is the first
-    //point which is called after all setters have been called with
-    //their values from the log4j.properties file, in particular we
-    //needed to give setCukwaClientPortNum() and -Hostname() a shot
-    if (chukwaClient == null){
-        if (getChukwaClientHostname() != null && getChukwaClientPortNum() != 0){
-        chukwaClient = new ChukwaAgentController(getChukwaClientHostname(), getChukwaClientPortNum());
-        System.out.println("setup adaptor with hostname " + getChukwaClientHostname() + " and portnum " + getChukwaClientPortNum());
-      }
-      else{
-        chukwaClient = new ChukwaAgentController();
-        System.out.println("setup adaptor with no args, which means it used its defaults");
-      }
-        
-      //if they haven't specified, default to retrying every 10 seconds for 5 minutes
-      long retryInterval = chukwaClientConnectRetryInterval;
-      if (retryInterval == 0)
-        retryInterval = 1000;
-      long numRetries = chukwaClientConnectNumRetry;
-      if (numRetries == 0)
-        numRetries = 30;
-      long adaptorID = chukwaClient.addFile(getRecordType(), getFile(), numRetries, retryInterval);
-      if (adaptorID > 0){
-        System.out.println("Added file tailing adaptor to chukwa agent for file " + getFile());
-      }
-      else{
-        System.out.println("Chukwa adaptor not added, addFile(" + getFile() + ") returned " + adaptorID);
-      }
-    }
-    long n = System.currentTimeMillis();
-    if (n >= nextCheck) {
-      now.setTime(n);
-      nextCheck = rc.getNextCheckMillis(now);
-      try {
-        rollOver();
-      }
-      catch(IOException ioe) {
-        LogLog.error("rollOver() failed.", ioe);
-      }
-    }
-    //escape the newlines from record bodies and then write this record to the log file
-    this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",this.layout.format(event)));
+  protected void subAppend(LoggingEvent event) 
+  {
+	  try
+	  {
+		  //we set up the chukwa adaptor here because this is the first
+		  //point which is called after all setters have been called with
+		  //their values from the log4j.properties file, in particular we
+		  //needed to give setCukwaClientPortNum() and -Hostname() a shot
+		  
+		  // Make sure only one thread can do this
+		  // and use the boolean to avoid the first level locking
+		  if (chukwaClientIsNull)
+		  {
+			  synchronized(chukwaLock)
+			  {
+				  if (chukwaClient == null){
+					  if (getChukwaClientHostname() != null && getChukwaClientPortNum() != 0){
+						  chukwaClient = new ChukwaAgentController(getChukwaClientHostname(), getChukwaClientPortNum());
+						  log.debug("setup adaptor with hostname " + getChukwaClientHostname() + " and portnum " + getChukwaClientPortNum());
+					  }
+					  else{
+						  chukwaClient = new ChukwaAgentController();
+						  log.debug("setup adaptor with no args, which means it used its defaults");
+					  }
+
+					  chukwaClientIsNull = false;
+					  
+					  //if they haven't specified, default to retrying every minute for 2 hours
+					  long retryInterval = chukwaClientConnectRetryInterval;
+					  if (retryInterval == 0)
+						  retryInterval = 1000 * 60;
+					  long numRetries = chukwaClientConnectNumRetry;
+					  if (numRetries == 0)
+						  numRetries = 120;
+					  String log4jFileName = getFile();
+					  String recordType = getRecordType();
+					  long adaptorID = chukwaClient.addFile(recordType, log4jFileName, numRetries, retryInterval);
+
+					  // Setup a shutdownHook for the controller
+					  clientFinalizer = new ClientFinalizer(chukwaClient,recordType,log4jFileName);
+					  Runtime.getRuntime().addShutdownHook(clientFinalizer);
+
+					  
+					  if (adaptorID > 0){
+						  log.debug("Added file tailing adaptor to chukwa agent for file " + log4jFileName + "using this recordType :" + recordType);
+					  }
+					  else{
+						  log.debug("Chukwa adaptor not added, addFile(" + log4jFileName + ") returned " + adaptorID);
+					  }
+					  
+				  }				  
+			  }
+		  }
+		  
+
+		  long n = System.currentTimeMillis();
+		  if (n >= nextCheck) {
+			  now.setTime(n);
+			  nextCheck = rc.getNextCheckMillis(now);
+			  try {
+				  rollOver();
+			  }
+			  catch(IOException ioe) {
+				  LogLog.error("rollOver() failed.", ioe);
+			  }
+		  }
+		  //escape the newlines from record bodies and then write this record to the log file
+		  this.qw.write(RecordConstants.escapeAllButLastRecordSeparator("\n",this.layout.format(event)));
+
+		  if(layout.ignoresThrowable()) {
+			  String[] s = event.getThrowableStrRep();
+			  if (s != null) {
+				  int len = s.length;
+				  for(int i = 0; i < len; i++) {
+					  this.qw.write(s[i]);
+					  this.qw.write(Layout.LINE_SEP);
+				  }
+			  }
+		  }
+
+		  if(this.immediateFlush) {
+			  this.qw.flush();
+		  }		  
+	  }
+	  catch(Throwable e)
+	  {
+		  System.err.println("Exception in ChukwaRollingAppender: " + e.getMessage());
+		  e.printStackTrace();
+	  }
     
-    if(layout.ignoresThrowable()) {
-      String[] s = event.getThrowableStrRep();
-      if (s != null) {
-        int len = s.length;
-        for(int i = 0; i < len; i++) {
-          this.qw.write(s[i]);
-          this.qw.write(Layout.LINE_SEP);
-        }
-      }
-    }
-
-    if(this.immediateFlush) {
-      this.qw.flush();
-    }
   }
 
   public String getChukwaClientHostname() {
@@ -479,7 +539,11 @@
  * */
 class RollingCalendar extends GregorianCalendar {
 
-  int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
+  /**
+	 * 
+	 */
+	private static final long serialVersionUID = 2153481574198792767L;
+int type = ChukwaDailyRollingFileAppender.TOP_OF_TROUBLE;
 
   RollingCalendar() {
     super();

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/log4j/Log4JMetricsContext.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.log4j;
+
+import java.io.*;
+
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsException;
+import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
+import org.apache.log4j.Logger;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class Log4JMetricsContext extends AbstractMetricsContext {
+
+  Logger out = null; //Logger.getLogger(Log4JMetricsContext.class);
+  
+  /* Configuration attribute names */
+//  protected static final String FILE_NAME_PROPERTY = "fileName";
+  protected static final String PERIOD_PROPERTY = "period";
+
+    
+  /** Creates a new instance of FileContext */
+  public Log4JMetricsContext() {}
+     
+  public void init(String contextName, ContextFactory factory) {
+    super.init(contextName, factory);
+  /*      
+    String fileName = getAttribute(FILE_NAME_PROPERTY);
+    if (fileName != null) {
+      file = new File(fileName);
+    }
+    */
+    out = Logger.getLogger("chukwa.hadoop.metrics."+contextName);
+    String periodStr = getAttribute(PERIOD_PROPERTY);
+    if (periodStr != null) {
+      int period = 0;
+      try {
+        period = Integer.parseInt(periodStr);
+      } catch (NumberFormatException nfe) {
+      }
+      if (period <= 0) {
+        throw new MetricsException("Invalid period: " + periodStr);
+      }
+      setPeriod(period);
+    }
+  }
+  
+  @Override
+  protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
+      throws IOException
+  {
+	JSONObject json = new JSONObject();
+    try {
+		json.put("contextName", contextName);
+		json.put("recordName", recordName);
+		json.put("chukwa_timestamp", System.currentTimeMillis());
+	    for (String tagName : outRec.getTagNames()) {
+            json.put(tagName, outRec.getTag(tagName));
+	    }
+	    for (String metricName : outRec.getMetricNames()) {
+	    	json.put(metricName, outRec.getMetric(metricName));
+	    }
+    } catch (JSONException e) {
+		// TODO Auto-generated catch block
+		e.printStackTrace();
+	}    
+    out.info(json.toString());
+  }
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/ErStreamHandler.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.lang.Thread;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.BufferedReader;
+import java.lang.StringBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ErStreamHandler extends Thread{
+	InputStream inpStr;
+	String command;
+	boolean record;
+	
+    private static Log log = LogFactory.getLog(ErStreamHandler.class);	
+    
+	public ErStreamHandler(InputStream inpStr,String command,boolean record){
+		this.inpStr=inpStr;
+		this.command=command;
+		this.record=record;
+
+	}
+
+	public void run(){
+		try {
+			InputStreamReader inpStrd=new InputStreamReader(inpStr);
+			BufferedReader buffRd=new BufferedReader(inpStrd);
+			String line=null;
+			StringBuffer sb=new StringBuffer();
+			while((line=buffRd.readLine())!= null){
+                 sb.append(line);			
+			}
+			buffRd.close();
+			
+			if (record && sb.length()>0) {
+				log.error(command+" execution error:"+sb.toString());				
+			}
+			
+		}catch (Exception e){
+			log.error(command+" error:"+e.getMessage());
+		}
+	}
+	
+	
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueDataLoader.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.lang.Thread;
+import java.lang.management.ManagementFactory;
+import java.io.FileOutputStream;
+import java.sql.SQLException;
+import java.io.IOException;
+import java.io.File;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.chukwa.inputtools.mdl.TorqueInfoProcessor;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.util.PidFile;
+
+public class TorqueDataLoader {
+	   private static Log log = LogFactory.getLog("TorqueDataLoader");
+
+	    private TorqueInfoProcessor tp=null;
+        private PidFile loader=null;
+	  
+	    
+	    public TorqueDataLoader (DataConfig mdlConfig, int interval){
+	    	log.info("in torqueDataLoader");
+	   	    tp = new TorqueInfoProcessor(mdlConfig, interval);
+	   	    loader=new PidFile("TorqueDataLoader");
+	    }
+	    
+	    	    	    	    
+	    public void run(){
+ 	        boolean first=true;
+	   	    while(true){
+	       	   try{
+	   	           tp.setup(first);
+	   	           first=false;
+	   	        }catch (Exception ex){
+	   	    	   tp.shutdown();
+	   	           
+	   	    	   if (first){
+	   	    	      log.error("setup error");
+	   	    	      ex.printStackTrace();
+	   	    	      loader.clean(); // only call before system.exit()
+	   	    	      System.exit(1);
+	   	            }
+	   	           log.error("setup fail, retry after 10 minutes");
+	   	           try {
+	                     Thread.sleep(600*1000);
+	               } catch (InterruptedException e) {
+	                // TODO Auto-generated catch block
+	                	 log.error(e.getMessage());
+	               // e.printStackTrace();
+	               }
+	   		       continue;   		 
+	   		 
+	   	       }
+	   	     
+	   	       try{
+	   		        tp.run_forever();
+	   	       }catch (SQLException ex) {
+	   		        tp.shutdown();
+	   	    	    log.error("processor died, reconnect again after 10 minutes");
+	   	    	    ex.printStackTrace();
+	   		        try {
+	                     Thread.sleep(600*1000);
+	                } catch (InterruptedException e) {
+	                     // TODO Auto-generated catch block
+	                	    log.error(e.getMessage());
+	                     // e.printStackTrace();
+	                }
+	   	       }catch (Exception ex){
+	   		        try {
+	                     Thread.sleep(16*1000);
+	                } catch (InterruptedException e) {
+	                            ;
+	                }
+	   	    	   tp.shutdown();
+	   	    	   log.error("process died...."+ex.getMessage());
+	   	    	   loader.clean();
+	   	    	   System.exit(1);
+	   	       }
+	   	       
+	   	  }//while
+	   
+	    }
+	    
+	    
+		 public static void main(String[] args) {
+			   /* if (args.length < 2 || args[0].startsWith("-h")
+			        || args[0].startsWith("--h")) {
+			      System.out.println("Usage: UtilDataLoader interval(sec)");
+			      System.exit(1);puvw-./chij
+			    }
+			    String interval = args[0];
+			    int intervalValue=Integer.parseInt(interval);
+			    */
+			   int intervalValue=60;
+
+
+	           DataConfig mdlConfig=new DataConfig();
+	           
+	           TorqueDataLoader tdl = new TorqueDataLoader(mdlConfig, intervalValue);
+	           tdl.run();
+
+		 }        
+		
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueInfoProcessor.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.sql.SQLException;
+import java.sql.ResultSet;
+import java.lang.Exception;
+import java.util.Calendar;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.TreeMap;
+import java.util.Iterator;
+import java.lang.StringBuffer;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.lang.Thread;
+import java.util.Timer;
+import java.lang.ProcessBuilder;
+import java.lang.Process;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.InterruptedException;
+import java.lang.System;
+import java.util.Date;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.mdl.DataConfig;
+import org.apache.hadoop.chukwa.inputtools.mdl.TorqueTimerTask;
+import org.apache.hadoop.chukwa.inputtools.mdl.ErStreamHandler;
+import org.apache.hadoop.chukwa.util.DatabaseWriter;
+
+
+public class TorqueInfoProcessor {
+    
+	private static Log log = LogFactory.getLog(TorqueInfoProcessor.class);
+    
+    private int intervalValue=60;
+	private String torqueServer = null;
+	private String torqueBinDir= null;
+	private String domain = null;
+
+    private TreeMap <String,TreeMap<String,String>> currentHodJobs;
+    
+    
+	public TorqueInfoProcessor(DataConfig mdlConfig, int interval){
+		this.intervalValue=interval;
+		
+		torqueServer=System.getProperty("TORQUE_SERVER");
+		torqueBinDir=System.getProperty("TORQUE_HOME")+File.separator+"bin";
+		domain=System.getProperty("DOMAIN");
+	    currentHodJobs=new TreeMap<String,TreeMap<String,String>>();
+	}
+	
+	
+	
+	public void setup(boolean recover)throws Exception {
+	 }
+		 
+	 private void  getHodJobInfo() throws IOException {
+		 StringBuffer sb=new StringBuffer();
+		 sb.append(torqueBinDir).append("/qstat -a");
+	 
+		 String[] getQueueInfoCommand=new String [3];
+		 getQueueInfoCommand[0]="ssh";
+		 getQueueInfoCommand[1]=torqueServer;
+		 getQueueInfoCommand[2]=sb.toString();
+		
+		 
+         String command=getQueueInfoCommand[0]+" "+getQueueInfoCommand[1]+" "+getQueueInfoCommand[2];
+		 ProcessBuilder pb= new ProcessBuilder(getQueueInfoCommand);
+         
+		 Process p=pb.start();
+		 
+		 Timer timeout=new Timer();
+		 TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
+		 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
+
+		 BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
+		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,true);
+		 errorHandler.start();
+		 
+		 String line = null;
+		 boolean start=false;
+         TreeSet<String> jobsInTorque=new TreeSet<String>();
+		 while((line=result.readLine())!=null){
+			 if (line.startsWith("---")){				 
+				 start=true;
+				 continue;
+			 }
+
+			 if(start){
+				 String [] items=line.split("\\s+");
+				 if (items.length>=10){
+				     String hodIdLong=items[0];				     
+				     String hodId=hodIdLong.split("[.]")[0];
+				     String userId=items[1];
+				     String numOfMachine=items[5];
+				     String status=items[9];
+				     jobsInTorque.add(hodId);
+                     if (!currentHodJobs.containsKey(hodId)) {
+                         TreeMap <String,String> aJobData=new TreeMap <String,String>();
+                     
+				         aJobData.put("userId", userId);
+				         aJobData.put("numOfMachine",numOfMachine);
+				         aJobData.put("traceCheckCount","0");
+                         aJobData.put("process", "0");
+				         aJobData.put("status",status);
+				         currentHodJobs.put(hodId,aJobData);
+				     }else {
+				    	 TreeMap <String,String> aJobData= currentHodJobs.get(hodId);
+				    	 aJobData.put("status", status);
+				         currentHodJobs.put(hodId,aJobData);
+				     }//if..else
+				 }				 
+			 }
+		 }//while
+		 
+         try {
+        	 errorHandler.join();
+         }catch (InterruptedException ie){
+        	 log.error(ie.getMessage());
+         }
+		 timeout.cancel();
+		 
+		 Set<String> currentHodJobIds=currentHodJobs.keySet();
+		 Iterator<String> currentHodJobIdsIt=currentHodJobIds.iterator();
+		 TreeSet<String> finishedHodIds=new TreeSet<String>();
+		 while (currentHodJobIdsIt.hasNext()){
+			 String hodId=currentHodJobIdsIt.next();
+			 if (!jobsInTorque.contains(hodId)) {
+				TreeMap <String,String> aJobData=currentHodJobs.get(hodId);
+				String process=aJobData.get("process");
+				if (process.equals("0") || process.equals("1")) {	
+					aJobData.put("status", "C");
+				}else {
+					finishedHodIds.add(hodId);
+				}
+			 }
+		 }//while
+		 
+		 Iterator<String >finishedHodIdsIt=finishedHodIds.iterator();
+		 while (finishedHodIdsIt.hasNext()){
+			 String hodId=finishedHodIdsIt.next();
+			 currentHodJobs.remove(hodId);
+		 }
+		  		 		 	 
+	 }
+	 
+	 private boolean loadQstatData(String hodId) throws IOException, SQLException {
+		 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
+		 String userId=aJobData.get("userId");
+		 
+		 StringBuffer sb=new StringBuffer();
+		 sb.append(torqueBinDir).append("/qstat -f -1 ").append(hodId);
+		 String[] qstatCommand=new String [3];
+		 qstatCommand[0]="ssh";
+		 qstatCommand[1]=torqueServer;
+		 qstatCommand[2]=sb.toString();
+		 
+         String command=qstatCommand[0]+" "+qstatCommand[1]+" "+qstatCommand[2];
+		 ProcessBuilder pb= new ProcessBuilder(qstatCommand);         
+		 Process p=pb.start();
+		 
+		 Timer timeout=new Timer();
+		 TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
+		 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
+
+		 BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
+		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
+		 errorHandler.start();
+		 String line=null;
+         String hosts=null;
+         long startTimeValue=-1;
+         long endTimeValue=Calendar.getInstance().getTimeInMillis();
+         long executeTimeValue=Calendar.getInstance().getTimeInMillis();
+         boolean qstatfinished;
+        
+		 while((line=result.readLine())!=null){
+			 if (line.indexOf("ctime")>=0){
+				 String startTime=line.split("=")[1].trim();
+				 //Tue Sep  9 23:44:29 2008
+				 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+				 Date startTimeDate;
+				try {
+					startTimeDate = sdf.parse(startTime);
+					 startTimeValue=startTimeDate.getTime();
+				} catch (ParseException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+				 
+			 }
+			 if (line.indexOf("mtime")>=0){
+				 String endTime=line.split("=")[1].trim();
+				 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+				 Date endTimeDate;
+				try {
+					endTimeDate = sdf.parse(endTime);
+					endTimeValue=endTimeDate.getTime();
+				} catch (ParseException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+				 
+			 }
+			 if (line.indexOf("etime")>=0){
+				 String executeTime=line.split("=")[1].trim();
+				 SimpleDateFormat sdf = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
+				 Date executeTimeDate;
+				try {
+					executeTimeDate = sdf.parse(executeTime);
+					executeTimeValue=executeTimeDate.getTime();
+				} catch (ParseException e) {
+					// TODO Auto-generated catch block
+					e.printStackTrace();
+				}
+				 
+			 }			 
+			 if (line.indexOf("exec_host")>=0){
+				 hosts=line.split("=")[1].trim();
+			 }
+		  }
+		 
+		  if (hosts!=null && startTimeValue>=0) {
+			 String [] items2=hosts.split("[+]"); 
+			 int num=0;
+		     for (int i=0;i<items2.length;i++) {
+		    	 String machinetmp=items2[i];
+		    	 if( machinetmp.length()>3){
+ 			    	 String machine=items2[i].substring(0,items2[i].length()-2);
+            	     StringBuffer data=new StringBuffer();
+            	     data.append("HodId=").append(hodId);
+            	     data.append(", Machine=").append(machine);
+            	     if(domain!=null) {
+            	    	 data.append(".").append(domain);
+            	     }
+            	     log.info(data);
+            	     num++;   
+			      }
+		     } 	 
+			 Timestamp startTimedb=new Timestamp(startTimeValue);
+			 Timestamp endTimedb=new Timestamp(endTimeValue);
+			 StringBuffer data=new StringBuffer();
+			 long timeQueued=executeTimeValue-startTimeValue;
+			 data.append("HodID=").append(hodId);
+			 data.append(", UserId=").append(userId);		
+			 data.append(", StartTime=").append(startTimedb);
+			 data.append(", TimeQueued=").append(timeQueued);
+			 data.append(", NumOfMachines=").append(num);
+			 data.append(", EndTime=").append(endTimedb);
+    	     //log.info(data);
+			 qstatfinished=true;
+			 
+	      } else{
+		   		   
+             qstatfinished=false;
+          }
+		    
+          try {
+        	 errorHandler.join();
+          }catch (InterruptedException ie){
+        	 log.error(ie.getMessage());
+          }
+          result.close();
+          timeout.cancel();
+         	   
+          return qstatfinished;
+	 }
+	
+	 
+	 private boolean loadTraceJobData(String hodId) throws IOException,SQLException{
+		 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
+		 //String queue=aJobData.get("queue");
+		 String userId=aJobData.get("userId");
+		 String process=aJobData.get("process");
+		 //String numOfMachine=aJobData.get("numOfMachine");
+		 
+		 //StringBuffer traceJobsb=new StringBuffer();
+		 StringBuffer sb=new StringBuffer();
+		 sb.append(torqueBinDir).append("/tracejob -n 10 -l -m -s ").append(hodId);
+	   	 //ProcessBuilder pb= new ProcessBuilder(getQueueInfoCommand.toString());
+		 String[] traceJobCommand=new String [3];
+		 traceJobCommand[0]="ssh";
+		 traceJobCommand[1]=torqueServer;
+		 traceJobCommand[2]=sb.toString();
+		 
+         String command=traceJobCommand[0]+" "+traceJobCommand[1]+" "+traceJobCommand[2];
+		 //System.out.println(command);
+		 ProcessBuilder pb= new ProcessBuilder(traceJobCommand);
+         
+         //String testCommand="/home/lyyang/work/chukwa/src/java/org/apache/hadoop/chukwa/ikit/sleeping";
+         //ProcessBuilder pb= new ProcessBuilder(testCommand);
+		 //pb.redirectErrorStream(false);
+
+		 Process p=pb.start();
+		 
+		 Timer timeout=new Timer();
+		 TorqueTimerTask torqueTimerTask=new TorqueTimerTask(p, command);
+		 timeout.schedule(torqueTimerTask, TorqueTimerTask.timeoutInterval*1000);
+
+		 BufferedReader result = new BufferedReader (new InputStreamReader (p.getInputStream()));
+		 ErStreamHandler errorHandler=new ErStreamHandler(p.getErrorStream(),command,false);
+		 errorHandler.start();
+		 String line=null;
+		 /*
+		 BufferedReader error = new BufferedReader (new InputStreamReader(p.getErrorStream()));
+		 String line = null;
+		 boolean start=false;
+         TreeSet<String> jobsInTorque=new TreeSet<String>();
+         String errorLine = null;;
+         while((errorLine=error.readLine())!=null) {
+        	 //discard the error message;
+        	 ;
+         }
+         */
+         String exit_status=null;
+         String hosts=null;
+         long timeQueued=-1;
+         long startTimeValue=-1;
+         long endTimeValue=-1;
+         boolean findResult=false;
+
+        
+		 while((line=result.readLine())!=null&& ! findResult){
+			 if (line.indexOf("end")>=0 &&line.indexOf("Exit_status")>=0 && line.indexOf("qtime")>=0){
+			      TreeMap <String,String> jobData=new TreeMap <String,String>() ;
+			      String [] items=line.split("\\s+");
+			      for (int i=0;i<items.length; i++) {
+			    	 String [] items2 = items[i].split("=");
+			    	 if (items2.length>=2){
+			    		 jobData.put(items2[0], items2[1]);
+			    	 }
+
+			      }
+	              String startTime=jobData.get("ctime");
+			      startTimeValue=Long.valueOf(startTime);
+			      startTimeValue=startTimeValue-startTimeValue%(60);
+			      Timestamp startTimedb=new Timestamp(startTimeValue*1000);
+			       
+			      String queueTime=jobData.get("qtime");
+			      long queueTimeValue=Long.valueOf(queueTime);
+			      
+			      String sTime=jobData.get("start");
+			      long sTimeValue=Long.valueOf(sTime);
+			      			      
+			      timeQueued=sTimeValue-queueTimeValue;
+			      
+			      String endTime=jobData.get("end");
+			      endTimeValue=Long.valueOf(endTime);
+			      endTimeValue=endTimeValue-endTimeValue%(60);
+			      Timestamp endTimedb=new Timestamp(endTimeValue*1000);
+			      
+			      exit_status=jobData.get("Exit_status");
+			      //if (process.equals("0")){
+			    	  hosts=jobData.get("exec_host");
+			    	  String [] items2=hosts.split("[+]");
+			    	  int num=0;
+			    	  for (int i=0;i<items2.length;i++) {
+			    		  String machinetemp=items2[i];
+			    		  if (machinetemp.length()>=3){
+	            		 
+			    			  String machine=items2[i].substring(0,items2[i].length()-2);
+			    			  StringBuffer data=new StringBuffer();
+			    			  data.append("HodId=").append(hodId);
+			    			  data.append(", Machine=").append(machine);
+		            	      if(domain!=null) {
+			            	   	 data.append(".").append(domain);
+			            	  }
+			    			  log.info(data.toString());
+			    			  num++;
+			    		  }  
+			    	  }
+			      
+			    	  StringBuffer data=new StringBuffer();
+			    	  data.append("HodID=").append(hodId);
+			    	  data.append(", UserId=").append(userId);		
+			    	  data.append(", Status=").append(exit_status);
+			    	  data.append(", TimeQueued=").append(timeQueued);
+			    	  data.append(", StartTime=").append(startTimedb);
+			    	  data.append(", EndTime=").append(endTimedb);
+			    	  data.append(", NumOfMachines=").append(num);
+			          log.info(data.toString());
+//			      } else{
+//			    	  StringBuffer data=new StringBuffer();
+//			    	  data.append("HodID=").append(hodId);
+//			    	  data.append(", TimeQueued=").append(timeQueued);
+//			    	  data.append(", EndTime=").append(endTimedb);
+//			    	  data.append(", Status=").append(exit_status);
+//			    	  log.info(data.toString());
+//			      }
+				  findResult=true;
+		          log.debug(" hod info for job "+hodId+" has been loaded ");
+			 }//if
+			 
+		}//while 
+
+         try {
+        	 errorHandler.join();
+         }catch (InterruptedException ie){
+        	 log.error(ie.getMessage());
+         }
+         
+        timeout.cancel();
+        boolean tracedone=false;
+        if (!findResult){
+        	
+            String traceCheckCount=aJobData.get("traceCheckCount");
+            int traceCheckCountValue=Integer.valueOf(traceCheckCount);
+            traceCheckCountValue=traceCheckCountValue+1;           
+            aJobData.put("traceCheckCount",String.valueOf(traceCheckCountValue));
+
+            
+            log.debug("did not find tracejob info for job "+hodId+", after "+traceCheckCountValue+" times checking");
+            if (traceCheckCountValue>=2){ 
+            	tracedone= true;
+            	
+//                StringBuffer deletesb1=new StringBuffer();
+//                deletesb1.append(" Delete from ").append(hodJobTable);
+//                deletesb1.append(" where hodid='").append(hodId).append("'");
+//                String delete1=deletesb1.toString();
+//                
+////                dbWriter.execute(delete1);
+//                
+//                StringBuffer deletesb2=new StringBuffer();
+//                deletesb2.append(" Delete from  ").append(hodMachineTable);
+//                deletesb2.append(" where hodid='").append(hodId).append("'");
+//                String delete2=deletesb2.toString();
+////                dbWriter.execute(delete2);
+            }
+        }
+        boolean finished=findResult|tracedone;
+       
+	   
+        return finished;
+      
+    //  return true;   
+	 }
+	 
+		 
+		 
+	 private void process_data() throws SQLException{
+	
+		 long currentTime=System.currentTimeMillis();
+		 currentTime=currentTime-currentTime%(60*1000);
+		 Timestamp timestamp=new Timestamp(currentTime);
+		 
+		 Set<String> hodIds=currentHodJobs.keySet();
+		 
+		 Iterator<String> hodIdsIt=hodIds.iterator();
+		 while (hodIdsIt.hasNext()){
+			 String hodId=(String) hodIdsIt.next();
+			 TreeMap<String,String> aJobData=currentHodJobs.get(hodId);
+			 //String queue=aJobData.get("queue");
+			 //String numOfMachine=aJobData.get("numOfMachine");
+			 String status=aJobData.get("status");
+			 String process=aJobData.get("process");
+			 if (process.equals("0") && (status.equals("R") ||status.equals("E"))){
+			     try {
+			    	 boolean result=loadQstatData(hodId);
+			    	 if (result){
+			    		 aJobData.put("process","1");
+				    	 currentHodJobs.put(hodId, aJobData);			    		 
+			    	 }
+			     }catch (IOException ioe){
+			    	 log.error("load qsat data Error:"+ioe.getMessage());
+			    	  
+			     }
+			 }
+			 if (! process.equals("2") && status.equals("C")){
+				 try {
+			    	boolean result=loadTraceJobData(hodId);
+			    	
+			    	if (result){
+			    		aJobData.put("process","2");
+			    		currentHodJobs.put(hodId, aJobData);
+			    	}
+				 }catch (IOException ioe){
+					 log.error("loadTraceJobData Error:"+ioe.getMessage());
+				 }
+			 }//if
+			
+			 
+		 } //while
+		 
+	 }
+	 
+	 
+	 private void handle_jobData() throws SQLException{		 
+		 try{
+		     getHodJobInfo();
+		 }catch (IOException ex){
+			 log.error("getQueueInfo Error:"+ex.getMessage());
+			 return;
+		 }
+		 try{    
+	         process_data();
+		 } catch (SQLException ex){
+			 log.error("process_data Error:"+ex.getMessage());
+			 throw ex;
+		 }
+	 }
+     
+	 
+	 public void run_forever() throws SQLException{    	            
+     	  while(true){
+          	  handle_jobData();
+              try {
+                  log.debug("sleeping ...");
+                  Thread.sleep(this.intervalValue*1000);
+              } catch (InterruptedException e) {
+                  log.error(e.getMessage()); 	
+              }
+          }
+     }
+     
+	 
+	 public void shutdown(){
+     }
+   	  
+	
+
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/mdl/TorqueTimerTask.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.inputtools.mdl;
+
+import java.util.TimerTask;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TorqueTimerTask extends TimerTask{
+	private Process ps=null;
+	private String command;
+	
+	private static Log log = LogFactory.getLog(TorqueTimerTask.class);
+    //public static int timeoutInterval=300;
+    public static int timeoutInterval=180;
+    
+	public TorqueTimerTask() {
+		super();
+		// TODO Auto-generated constructor stub
+	}
+	
+	public  TorqueTimerTask(Process process,String command){
+    	super();
+    	this.ps=process;
+    	this.command=command;
+    	
+    }
+	
+	public void run() {
+		ps.destroy();
+	    log.error("torque command: "+command+" timed out");
+		
+	}
+
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java?rev=709533&r1=709532&r2=709533&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Exec.java Fri Oct 31 11:57:04 2008
@@ -1,47 +1,67 @@
 package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
 
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Timer;
+import java.util.TimerTask;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
 import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
+import org.apache.hadoop.chukwa.util.PidFile;
 import org.json.JSONException;
 import org.json.JSONObject;
 
-public class Exec extends ExecPlugin
-{
+public class Exec extends TimerTask {
 	private static Log log = LogFactory.getLog(Exec.class);
 	private String cmde = null;
-	
-	public Exec(String[] cmds)
-	{
+    private static PidFile pFile = null;
+    private Timer timer = null;
+    private IPlugin plugin = null;
+    
+	public Exec(String[] cmds) {
 		StringBuffer c = new StringBuffer();
 		for(String cmd : cmds) {
 			c.append(cmd);
 			c.append(" ");
 		}
 		cmde = c.toString();
+		plugin = new ExecHelper(cmds);
+	}
+	public void run() {
+		try {
+			JSONObject result = plugin.execute();
+			if (result.getInt("status") < 0) {
+				System.out.println("Error");
+				log.warn("[ChukwaError]:"+ Exec.class + ", " + result.getString("stderr"));
+				System.exit(-1);
+			} else {
+				log.info(result.get("stdout"));
+			}
+		} catch(JSONException e) {
+			log.error("Exec output unparsable:"+this.cmde);
+		}
 	}
-	
-	@Override
-	public String getCmde()
-	{
+	public String getCmde() {
 		return cmde;
 	}
-
-	public static void main(String[] args) throws JSONException
-	{
-		IPlugin plugin = new Exec(args);
-		JSONObject result = plugin.execute();		
-		if (result.getInt("status") < 0)
-		{
-			System.out.println("Error");
-			log.warn("[ChukwaError]:"+ Exec.class + ", " + result.getString("stderr"));
-			System.exit(-1);
-		}
-		else
-		{
-			log.info(result.get("stdout"));
-		}
-		System.exit(0);
+    
+	public static void main(String[] args) {
+   	    pFile=new PidFile(System.getProperty("RECORD_TYPE")+"-data-loader");
+   	    Runtime.getRuntime().addShutdownHook(pFile);
+   	    int period = 60;
+   	    try {
+			if(System.getProperty("PERIOD")!=null) {
+			    period = Integer.parseInt(System.getProperty("PERIOD"));
+			}
+        } catch(NumberFormatException ex) {
+			ex.printStackTrace();
+			System.out.println("Usage: java -DPERIOD=nn -DRECORD_TYPE=recordType Exec [cmd]");
+			System.out.println("PERIOD should be numeric format of seconds.");        	
+			System.exit(0);
+        }
+   	    Timer timer = new Timer();
+		timer.schedule(new Exec(args),0, period*1000);
 	}
 }

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/ExecHelper.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,34 @@
+package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.chukwa.inputtools.plugin.ExecPlugin;
+import org.apache.hadoop.chukwa.inputtools.plugin.IPlugin;
+import org.apache.hadoop.chukwa.util.PidFile;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class ExecHelper extends ExecPlugin {
+	private static Log log = LogFactory.getLog(ExecHelper.class);
+	private String cmde = null;
+    private static PidFile pFile = null;
+    private Timer timer = null;
+    
+	public ExecHelper(String[] cmds) {
+		StringBuffer c = new StringBuffer();
+		for(String cmd : cmds) {
+			c.append(cmd);
+			c.append(" ");
+		}
+		cmde = c.toString();
+	}
+	
+	public String getCmde() {
+		return cmde;
+	}    
+}

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java?rev=709533&r1=709532&r2=709533&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/inputtools/plugin/metrics/Log4JMetricsContext.java Fri Oct 31 11:57:04 2008
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.chukwa.inputtools.plugin.metrics;
-
-import java.io.*;
-
-import org.apache.hadoop.metrics.ContextFactory;
-import org.apache.hadoop.metrics.MetricsException;
-import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
-import org.apache.hadoop.metrics.spi.OutputRecord;
-import org.apache.log4j.Logger;
-
-public class Log4JMetricsContext extends AbstractMetricsContext {
-
-  static Logger out = Logger.getLogger(Log4JMetricsContext.class);
-  
-  /* Configuration attribute names */
-//  protected static final String FILE_NAME_PROPERTY = "fileName";
-  protected static final String PERIOD_PROPERTY = "period";
-
-    
-  /** Creates a new instance of FileContext */
-  public Log4JMetricsContext() {}
-     
-  public void init(String contextName, ContextFactory factory) {
-    super.init(contextName, factory);
-  /*      
-    String fileName = getAttribute(FILE_NAME_PROPERTY);
-    if (fileName != null) {
-      file = new File(fileName);
-    }
-    */    
-    String periodStr = getAttribute(PERIOD_PROPERTY);
-    if (periodStr != null) {
-      int period = 0;
-      try {
-        period = Integer.parseInt(periodStr);
-      } catch (NumberFormatException nfe) {
-      }
-      if (period <= 0) {
-        throw new MetricsException("Invalid period: " + periodStr);
-      }
-      setPeriod(period);
-    }
-  }
-  
-  @Override
-  protected void emitRecord(String contextName, String recordName, OutputRecord outRec)
-      throws IOException
-  {
-    StringBuilder writer = new StringBuilder();
-    String separator = " ";
-    writer.append("contextName=");
-    writer.append(contextName);
-    
-    writer.append(separator);
-    writer.append("recordName=");
-    writer.append(recordName);
-    
-
-    writer.append(separator);
-    writer.append("chukwa_timestamp="+ System.currentTimeMillis());
-    writer.append(recordName);
-    
-    for (String tagName : outRec.getTagNames()) {
-      writer.append(separator);
-      writer.append(tagName);
-      writer.append("=");
-      writer.append(outRec.getTag(tagName));
-    }
-    for (String metricName : outRec.getMetricNames()) {
-      writer.append(separator);
-      writer.append(metricName);
-      writer.append("=");
-      writer.append(outRec.getMetric(metricName));
-    }
-    
-    out.info(writer.toString());
-//    out.println(writer);
-  }
-
-}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/ClusterConfig.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.*;
+import java.util.*;
+
+public class ClusterConfig {
+    public static HashMap<String, String> clusterMap = new HashMap<String, String>();
+    private String path=System.getenv("CHUKWA_CONF_DIR")+File.separator;
+    static public String getContents(File aFile) {
+        //...checks on aFile are elided
+        StringBuffer contents = new StringBuffer();
+   
+        try {
+          //use buffering, reading one line at a time
+          //FileReader always assumes default encoding is OK!
+          BufferedReader input =  new BufferedReader(new FileReader(aFile));
+          try {
+             String line = null; //not declared within while loop
+             /*
+              * readLine is a bit quirky :
+              * it returns the content of a line MINUS the newline.
+              * it returns null only for the END of the stream.
+              * it returns an empty String if two newlines appear in a row.
+              */
+             while (( line = input.readLine()) != null){
+                contents.append(line);
+                contents.append(System.getProperty("line.separator"));
+             }
+          } finally {
+             input.close();
+          }
+        }
+          catch (IOException ex){
+          ex.printStackTrace();
+        }
+
+        return contents.toString();
+    }
+
+    public ClusterConfig() {
+        File cc = new File(path+"jdbc.conf");
+        String buffer = getContents(cc);
+        String[] lines = buffer.split("\n");
+        for(String line: lines) {
+            String[] data = line.split("=",2);
+            clusterMap.put(data[0],data[1]);
+        }
+    }
+
+    public String getURL(String cluster) {
+        String url = clusterMap.get(cluster);
+        return url; 
+    }
+
+    public Iterator<String> getClusters() {
+        Set<String> keys = clusterMap.keySet();
+        Iterator<String> i = keys.iterator();
+        return i;
+    }    
+}

Added: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java?rev=709533&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java (added)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/util/PidFile.java Fri Oct 31 11:57:04 2008
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.util;
+
+import java.io.IOException;
+import java.io.File;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.nio.channels.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+public class PidFile extends Thread {
+	
+	String name;
+	private static Log log = LogFactory.getLog(PidFile.class);
+	private static FileLock lock = null;
+        private static FileOutputStream pidFileOutput = null;
+	
+	public PidFile(String name){
+		this.name=name;
+		try {
+		    init();
+		} catch(IOException ex) {
+			clean();
+			System.exit(-1);
+		}
+	}
+	
+	public void init() throws IOException{
+  	     String pidLong=ManagementFactory.getRuntimeMXBean().getName();
+  	     String[] items=pidLong.split("@");
+  	     String pid=items[0];
+	     String chukwaPath=System.getProperty("CHUKWA_HOME");
+	     StringBuffer pidFilesb=new StringBuffer();
+	     String pidDir = chukwaPath+File.separator+"var"+File.separator+"run";
+	     pidFilesb.append(pidDir).append(File.separator).append(name).append(".pid");
+	     try{
+	    	 File existsFile = new File(pidDir);
+	    	 if(!existsFile.exists()) {
+		    	 boolean success = (new File(pidDir)).mkdirs();
+		    	 if(!success) {
+		    		 throw(new IOException());
+		    	 }
+	    	 }
+	         File pidFile= new File(pidFilesb.toString());
+
+	         pidFileOutput= new FileOutputStream(pidFile);
+             pidFileOutput.write(pid.getBytes());
+	         pidFileOutput.flush();
+	         FileChannel channel = pidFileOutput.getChannel();
+	         PidFile.lock = channel.tryLock();
+             if(PidFile.lock!=null) {
+	             log.debug("Initlization succeeded...");
+             } else {
+                 throw(new IOException());
+             }
+	     }catch (IOException ex){
+	    	 System.out.println("Initializaiton failed: can not write pid file.");
+	    	 log.error("Initialization failed...");
+	    	 log.error(ex.getMessage());
+	    	 System.exit(-1);
+	    	 throw ex;
+	    	 
+	     }
+	   
+	}	
+	
+	public void clean(){
+        String chukwaPath=System.getenv("CHUKWA_HOME");
+        StringBuffer pidFilesb=new StringBuffer();
+        pidFilesb.append(chukwaPath).append("/var/run/").append(name).append(".pid"); 
+        String pidFileName=pidFilesb.toString();
+
+        File pidFile=new File(pidFileName);
+        if (!pidFile.exists()) {
+    	   log.error("Delete pid file, No such file or directory: "+pidFileName);
+        } else {
+           try {
+               lock.release();
+	       pidFileOutput.close();
+           } catch(IOException e) {
+               log.error("Unable to release file lock: "+pidFileName);
+           }
+        }
+
+        boolean result=pidFile.delete();
+        if (!result){
+    	   log.error("Delete pid file failed, "+pidFileName);
+        }
+	}
+
+	public void run() {
+		clean();
+	}
+}