You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [3/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java Wed Mar 11 22:39:26 2009
@@ -18,59 +18,57 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import java.util.ArrayList;
 
 /**
- * A subclass of FileTailingAdaptor that reads UTF8/ascii
- * files and splits records at carriage returns.
- *
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii files and splits
+ * records at carriage returns.
+ * 
  */
 public class CharFileTailingAdaptorUTF8 extends FileTailingAdaptor {
-  
-
 
   private static final char SEPARATOR = '\n';
-  
+
   private ArrayList<Integer> offsets = new ArrayList<Integer>();
-  
+
   /**
    * 
    * Note: this method uses a temporary ArrayList (shared across instances).
    * This means we're copying ints each time. This could be a performance issue.
-   * Also, 'offsets' never shrinks, and will be of size proportional to the 
+   * Also, 'offsets' never shrinks, and will be of size proportional to the
    * largest number of lines ever seen in an event.
    */
   @Override
-    protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
-    throws InterruptedException
-  {
-      for(int i = 0; i < buf.length; ++i) {
-        if(buf[i] == SEPARATOR) {
-          offsets.add(i);
-        }
+  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+      byte[] buf) throws InterruptedException {
+    for (int i = 0; i < buf.length; ++i) {
+      if (buf[i] == SEPARATOR) {
+        offsets.add(i);
       }
+    }
+
+    if (offsets.size() > 0) {
+      int[] offsets_i = new int[offsets.size()];
+      for (int i = 0; i < offsets_i.length; ++i)
+        offsets_i[i] = offsets.get(i);
+
+      int bytesUsed = offsets_i[offsets_i.length - 1] + 1; // char at last
+                                                           // offset uses a byte
+      assert bytesUsed > 0 : " shouldn't send empty events";
+      ChunkImpl event = new ChunkImpl(type, toWatch.getAbsolutePath(),
+          buffOffsetInFile + bytesUsed, buf, this);
+
+      event.setRecordOffsets(offsets_i);
+      eq.add(event);
+
+      offsets.clear();
+      return bytesUsed;
+    } else
+      return 0;
 
-      if(offsets.size() > 0)  {
-        int[] offsets_i = new int[offsets.size()];
-        for(int i = 0; i < offsets_i.length ; ++i)
-          offsets_i[i] = offsets.get(i);
-      
-        int bytesUsed = offsets_i[offsets_i.length-1]  + 1; //char at last offset uses a byte
-        assert bytesUsed > 0: " shouldn't send empty events";
-        ChunkImpl event = new ChunkImpl(type, toWatch.getAbsolutePath(),buffOffsetInFile + bytesUsed, buf, this );
-
-        event.setRecordOffsets(offsets_i);
-        eq.add(event);
-        
-        offsets.clear();
-        return bytesUsed;
-      }
-      else
-        return 0;
-      
   }
 
-  
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java Wed Mar 11 22:39:26 2009
@@ -18,74 +18,73 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
-import org.apache.hadoop.chukwa.util.RecordConstants; 
-
+import org.apache.hadoop.chukwa.util.RecordConstants;
 import java.util.ArrayList;
 
-
 /**
- * A subclass of FileTailingAdaptor that reads UTF8/ascii
- * files and splits records at non-escaped carriage returns
- *
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii files and splits
+ * records at non-escaped carriage returns
+ * 
  */
-public class CharFileTailingAdaptorUTF8NewLineEscaped extends FileTailingAdaptor {
-  
+public class CharFileTailingAdaptorUTF8NewLineEscaped extends
+    FileTailingAdaptor {
 
   private static final char SEPARATOR = '\n';
-  
+
   private ArrayList<Integer> offsets = new ArrayList<Integer>();
-  
+
   /**
    * 
    * Note: this method uses a temporary ArrayList (shared across instances).
    * This means we're copying ints each time. This could be a performance issue.
-   * Also, 'offsets' never shrinks, and will be of size proportional to the 
+   * Also, 'offsets' never shrinks, and will be of size proportional to the
    * largest number of lines ever seen in an event.
    */
   @Override
-    protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
-    throws InterruptedException
-  {
+  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+      byte[] buf) throws InterruptedException {
     String es = RecordConstants.RECORD_SEPARATOR_ESCAPE_SEQ;
-    for(int i = 0; i < buf.length; ++i) {
-        // if this is a separator
-      if(buf[i] == SEPARATOR){
+    for (int i = 0; i < buf.length; ++i) {
+      // if this is a separator
+      if (buf[i] == SEPARATOR) {
         // if possibly preceded by escape sequence (avoid outOfBounds here)
         boolean escaped = false; // was it escaped?
-        if (i-es.length() >= 0){
-          escaped = true; // maybe (at least there was room for an escape sequence, so let's check for the e.s.)
-          for (int j = 0; j < es.length(); j++){
-            if (buf[i-es.length()+j] != es.charAt(j)){
+        if (i - es.length() >= 0) {
+          escaped = true; // maybe (at least there was room for an escape
+                          // sequence, so let's check for the e.s.)
+          for (int j = 0; j < es.length(); j++) {
+            if (buf[i - es.length() + j] != es.charAt(j)) {
               escaped = false;
             }
           }
         }
-        if (!escaped){
+        if (!escaped) {
           offsets.add(i);
         }
       }
     }
 
-    if(offsets.size() > 0)  {
+    if (offsets.size() > 0) {
       int[] offsets_i = new int[offsets.size()];
-      for(int i = 0; i < offsets_i.length ; ++i)
+      for (int i = 0; i < offsets_i.length; ++i)
         offsets_i[i] = offsets.get(i);
-      //make the stream unique to this adaptor 
-      int bytesUsed = offsets_i[offsets_i.length-1]  + 1; //char at last offset uses a byte
-      assert bytesUsed > 0: " shouldn't send empty events";
+      // make the stream unique to this adaptor
+      int bytesUsed = offsets_i[offsets_i.length - 1] + 1; // char at last
+                                                           // offset uses a byte
+      assert bytesUsed > 0 : " shouldn't send empty events";
       ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
-           buffOffsetInFile + bytesUsed, buf,this);
-      
+          buffOffsetInFile + bytesUsed, buf, this);
+
       chunk.setSeqID(buffOffsetInFile + bytesUsed);
       chunk.setRecordOffsets(offsets_i);
       eq.add(chunk);
-      
+
       offsets.clear();
       return bytesUsed;
-    }
-    else
+    } else
       return 0;
   }
 
@@ -93,5 +92,4 @@
     return "escaped newline CFTA-UTF8";
   }
 
-  
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
-
 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
@@ -28,57 +28,59 @@
 import org.apache.log4j.Logger;
 
 /**
- * A shared thread used by all FileTailingAdaptors. 
+ * A shared thread used by all FileTailingAdaptors.
  * 
- * For now, it tries each file in succession. If it gets through every
- * file within two seconds, and no more data remains, it will sleep.
+ * For now, it tries each file in succession. If it gets through every file
+ * within two seconds, and no more data remains, it will sleep.
  * 
  * If there was still data available in any file, the adaptor will loop again.
- *
+ * 
  */
 class FileTailer extends Thread {
   static Logger log = Logger.getLogger(FileTailer.class);
-  
-  private  List<FileTailingAdaptor> adaptors;
+
+  private List<FileTailingAdaptor> adaptors;
   private volatile boolean isRunning = true;
-  ChunkQueue eq; //not private -- useful for file tailing adaptor classes
-  
+  ChunkQueue eq; // not private -- useful for file tailing adaptor classes
+
   /**
    * How often to tail each file.
    */
-  int DEFAULT_SAMPLE_PERIOD_MS = 1000* 2;
+  int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 2;
   int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
   private static Configuration conf = null;
-  
+
   FileTailer() {
-	if (conf == null) {
-	  ChukwaAgent agent = ChukwaAgent.getAgent();
-	  if (agent != null) {
-		conf = agent.getConfiguration();
-    	if (conf != null) {
-    	  SAMPLE_PERIOD_MS= conf.getInt("chukwaAgent.adaptor.context.switch.time", DEFAULT_SAMPLE_PERIOD_MS);
+    if (conf == null) {
+      ChukwaAgent agent = ChukwaAgent.getAgent();
+      if (agent != null) {
+        conf = agent.getConfiguration();
+        if (conf != null) {
+          SAMPLE_PERIOD_MS = conf.getInt(
+              "chukwaAgent.adaptor.context.switch.time",
+              DEFAULT_SAMPLE_PERIOD_MS);
         }
-	  }
-	}
+      }
+    }
     eq = DataFactory.getInstance().getEventQueue();
-     
-    //iterations are much more common than adding a new adaptor
+
+    // iterations are much more common than adding a new adaptor
     adaptors = new CopyOnWriteArrayList<FileTailingAdaptor>();
 
     this.setDaemon(true);
-    start();//start the file-tailing thread
+    start();// start the file-tailing thread
+  }
+
+  // called by FileTailingAdaptor, only
+  void startWatchingFile(FileTailingAdaptor f) {
+    adaptors.add(f);
   }
-   
-  //called by FileTailingAdaptor, only
-   void startWatchingFile(FileTailingAdaptor f) {
-     adaptors.add(f);
-   }
-
-   //called by FileTailingAdaptor, only
-   void stopWatchingFile(FileTailingAdaptor f) {
-     adaptors.remove(f);
-   }
-   
+
+  // called by FileTailingAdaptor, only
+  void stopWatchingFile(FileTailingAdaptor f) {
+    adaptors.remove(f);
+  }
+
   public void run() {
     while (isRunning) {
       try {
@@ -98,6 +100,5 @@
       }
     }
   }
-  
-  
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Wed Mar 11 22:39:26 2009
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
@@ -34,298 +34,313 @@
 
 /**
  * An adaptor that repeatedly tails a specified file, sending the new bytes.
- * This class does not split out records, but just sends everything up to end of file.
- * Subclasses can alter this behavior by overriding extractRecords().
+ * This class does not split out records, but just sends everything up to end of
+ * file. Subclasses can alter this behavior by overriding extractRecords().
  * 
  */
-public class FileTailingAdaptor implements Adaptor
-{
+public class FileTailingAdaptor implements Adaptor {
+
+  static Logger log;
+
+  /**
+   * This is the maximum amount we'll read from any one file before moving on to
+   * the next. This way, we get quick response time for other files if one file
+   * is growing rapidly.
+   */
+  public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
+  public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
+  public static int MAX_RETRIES = 300;
+  public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
+
+  protected static Configuration conf = null;
+  private int attempts = 0;
+  private long gracefulPeriodExpired = 0l;
+  private boolean adaptorInError = false;
+  File toWatch;
+  /**
+   * next PHYSICAL offset to read
+   */
+  protected long fileReadOffset;
+  protected String type;
+  private ChunkReceiver dest;
+  protected RandomAccessFile reader = null;
+  protected long adaptorID;
+
+  /**
+   * The logical offset of the first byte of the file
+   */
+  private long offsetOfFirstByte = 0;
+
+  private static FileTailer tailer;
+
+  static {
+    tailer = new FileTailer();
+    log = Logger.getLogger(FileTailingAdaptor.class);
+  }
 
-	static Logger log;
+  public void start(long adaptorID, String type, String params, long bytes,
+      ChunkReceiver dest) {
+    // in this case params = filename
+    this.adaptorID = adaptorID;
+    this.type = type;
+    this.dest = dest;
+    this.attempts = 0;
+
+    Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
+    Matcher m = cmd.matcher(params);
+    if (m.matches()) {
+      offsetOfFirstByte = Long.parseLong(m.group(1));
+      toWatch = new File(m.group(2));
+    } else {
+      toWatch = new File(params.trim());
+    }
+    log.info("started file tailer on file " + toWatch
+        + " with first byte at offset " + offsetOfFirstByte);
 
-	/**
-	 * This is the maximum amount we'll read from any one file before moving on
-	 * to the next. This way, we get quick response time for other files if one
-	 * file is growing rapidly.
-	 */
-	public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024 ;
-	public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE ;
-	public static int MAX_RETRIES = 300;
-	public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
-  
-	protected static Configuration conf = null;
-	private int attempts = 0;
-	private long gracefulPeriodExpired = 0l;
-	private boolean adaptorInError = false;
-	File toWatch;
-	/**
-	 * next PHYSICAL offset to read
-	 */
-	protected long fileReadOffset;
-	protected String type;
-	private ChunkReceiver dest;
-	protected RandomAccessFile reader = null;
-	protected long adaptorID;
-	
-	/**
-	 * The logical offset of the first byte of the file
-	 */
-	private long offsetOfFirstByte = 0;
-	
-	private static FileTailer tailer;
-
-	static {
-		tailer = new FileTailer();
-		log =Logger.getLogger(FileTailingAdaptor.class);
-	}
-
-	public void start(long adaptorID, String type, String params, long bytes, ChunkReceiver dest) {
-	    //in this case params = filename 
-		this.adaptorID = adaptorID;
-	    this.type = type;
-	    this.dest = dest;
-	    this.attempts = 0;
-			
-	    Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
-	    Matcher m = cmd.matcher(params);
-	    if(m.matches()) {
-	        offsetOfFirstByte = Long.parseLong(m.group(1));
-	        toWatch = new File(m.group(2));
-	    } else {
-	        toWatch = new File(params.trim());
-	    }
-	  log.info("started file tailer on file " + toWatch + " with first byte at offset "+offsetOfFirstByte);
-	  
-		this.fileReadOffset= bytes;
-		tailer.startWatchingFile(this);
-	}
-
-	/**
-	 * Do one last tail, and then stop
-	 * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
-	 */
-	public long shutdown() throws AdaptorException {
-	  try{
-	    if(toWatch.exists()) {
-	    	int retry=0;
-	    	tailer.stopWatchingFile(this);
-			TerminatorThread lastTail = new TerminatorThread(this,tailer.eq);
-			lastTail.setDaemon(true);
-			lastTail.start();
-			while(lastTail.isAlive() && retry < 60) {
-				try {
-					log.info("Retry:"+retry);
-				    Thread.currentThread().sleep(1000);
-				    retry++;
-				} catch(InterruptedException ex) {
-				}
-			}
-	    }
-	  } finally {
-	    return fileReadOffset + offsetOfFirstByte;
-	  }
-
-	}
-	/**
-	 * Stop tailing the file, effective immediately.
-	 */
-	public void hardStop() throws AdaptorException {
+    this.fileReadOffset = bytes;
+    tailer.startWatchingFile(this);
+  }
+
+  /**
+   * Do one last tail, and then stop
+   * 
+   * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
+   */
+  public long shutdown() throws AdaptorException {
+    try {
+      if (toWatch.exists()) {
+        int retry = 0;
         tailer.stopWatchingFile(this);
-	}
+        TerminatorThread lastTail = new TerminatorThread(this, tailer.eq);
+        lastTail.setDaemon(true);
+        lastTail.start();
+        while (lastTail.isAlive() && retry < 60) {
+          try {
+            log.info("Retry:" + retry);
+            Thread.currentThread().sleep(1000);
+            retry++;
+          } catch (InterruptedException ex) {
+          }
+        }
+      }
+    } finally {
+      return fileReadOffset + offsetOfFirstByte;
+    }
+
+  }
+
+  /**
+   * Stop tailing the file, effective immediately.
+   */
+  public void hardStop() throws AdaptorException {
+    tailer.stopWatchingFile(this);
+  }
 
   /**
    * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
    */
-	public String getCurrentStatus() {
-		return type.trim() + " " + offsetOfFirstByte+ " " + toWatch.getPath() + " " + fileReadOffset;
-		// can make this more efficient using a StringBuilder
-	}
-
-	public String toString() {
-		return "Tailer on " + toWatch;
-	}
-
-	public String getStreamName() {
-		return toWatch.getPath();
-	}
-	
-	/**
-	 * Looks at the tail of the associated file, adds some of it to event queue
-	 * This method is not thread safe. Returns true if there's more data in the
-	 * file
-	 * 
-	 * @param eq the queue to write Chunks into
-	 */
-	public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException {
+  public String getCurrentStatus() {
+    return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath()
+        + " " + fileReadOffset;
+    // can make this more efficient using a StringBuilder
+  }
+
+  public String toString() {
+    return "Tailer on " + toWatch;
+  }
+
+  public String getStreamName() {
+    return toWatch.getPath();
+  }
+
+  /**
+   * Looks at the tail of the associated file, adds some of it to event queue
+   * This method is not thread safe. Returns true if there's more data in the
+   * file
+   * 
+   * @param eq the queue to write Chunks into
+   */
+  public synchronized boolean tailFile(ChunkReceiver eq)
+      throws InterruptedException {
     boolean hasMoreData = false;
 
-    
-	    try {
-	      if( (adaptorInError == true)  && (System.currentTimeMillis() > gracefulPeriodExpired)) {
-	        if (!toWatch.exists()) {
-	          log.warn("Adaptor|" + adaptorID +"|attempts=" +  attempts + "| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired.  File removed from streaming.");
-	        } else if (!toWatch.canRead()) {
-	          log.warn("Adaptor|" + adaptorID +"|attempts=" +  attempts + "| File cannot be read: "+toWatch.getAbsolutePath()+", streaming policy expired.  File removed from streaming.");
-	        } else {
-	          // Should have never been there
-	          adaptorInError = false;
-	          gracefulPeriodExpired = 0L;
-	          attempts = 0;
-	          return false;
-	        }
-
-
-	        ChukwaAgent agent = ChukwaAgent.getAgent();
-	        if (agent != null) {
-	          agent.stopAdaptor(adaptorID, false);
-	        } else {
-	          log.info("Agent is null, running in default mode");
-	        }
-	        return false;
-
-	      } else if(!toWatch.exists() || !toWatch.canRead()) {
-	        if (adaptorInError == false) {
-	          long now = System.currentTimeMillis();
-	          gracefulPeriodExpired = now + GRACEFUL_PERIOD;
-	          adaptorInError = true;
-	          attempts = 0;
-	          log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", graceful period will Expire at now:" + now 
-	              + " + " +  GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
-	        } else if (attempts%10 == 0) {
-	            log.info("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts);  
-	        }
-
-	        attempts++;
-	        return false;  //no more data
-	      } 
-	        
-	      	if (reader == null)
-	      	{
-	      		reader = new RandomAccessFile(toWatch, "r");
-	      		log.info("Adaptor|" + adaptorID + "|Opening the file for the first time|seek|" + fileReadOffset);
-	      	}
-	      	
-	      	long len = 0L;
-	    	try {
-		      	RandomAccessFile newReader = new RandomAccessFile(toWatch,"r");
-		    	len = reader.length();
-		    	long newLength = newReader.length();
-		      	if(newLength<len && fileReadOffset >= len) {
-		      		reader.close();
-		      		reader = newReader;
-		      		fileReadOffset=0L;
-		      		log.debug("Adaptor|" + adaptorID +"| File size mismatched, rotating: "+toWatch.getAbsolutePath());
-		      	} else {
-		      		try {
-		      		    newReader.close();
-		      		} catch(IOException e) {
-		      			// do nothing.
-		      		}
-		      	}
-	    	} catch(IOException e) {
-      			// do nothing, if file doesn't exist.	    		
-	    	}
-	    	if (len >= fileReadOffset) {
-	    		if(offsetOfFirstByte>fileReadOffset) {
-	    			// If the file rotated, the recorded offsetOfFirstByte is greater than file size,
-	    			// reset the first byte position to beginning of the file.	
-	    			fileReadOffset=0;
-	    			offsetOfFirstByte = 0L;       
-	    			log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
-	    		}
-	    		
-	    		log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset );
-	    		reader.seek(fileReadOffset);
-	    
-	    		long bufSize = len - fileReadOffset;
-	    		
-	    		if (conf == null)
-	    		{
-	    			ChukwaAgent agent = ChukwaAgent.getAgent();
-	    			if (agent != null)
-	    			{
-	    				conf = agent.getConfiguration();
-	        			if (conf != null)
-	        			{
-	        				MAX_READ_SIZE= conf.getInt("chukwaAgent.fileTailingAdaptor.maxReadSize", DEFAULT_MAX_READ_SIZE);
-	        				log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
-	        			}	
-	        			else
-	        			{
-	        				log.info("Conf is null, running in default mode");
-	        			}
-	    			}
-	    			else
-	    			{
-	    				log.info("Agent is null, running in default mode");
-	    			}
-	    		}
-	    		
-	    		if (bufSize > MAX_READ_SIZE) {
-	    			bufSize = MAX_READ_SIZE;
-	    			hasMoreData = true;
-	    		}
-	    		byte[] buf = new byte[(int) bufSize];
-	    		
-	    		
-	    		long curOffset = fileReadOffset;
-	    		
-	    		int bufferRead = reader.read(buf);
-	    		assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
-	    		  + " pointer is "
-	    		  + reader.getFilePointer()
-	    		  + " but offset is " + fileReadOffset + bufSize;
-
-	    		int bytesUsed = extractRecords(dest, fileReadOffset + offsetOfFirstByte, buf);
-
-	    		// ===   WARNING   ===
-	    		// If we couldn't found a complete record AND
-	    		// we cannot read more, i.e bufferRead == MAX_READ_SIZE 
-	    		// it's because the record is too BIG
-	    		// So log.warn, and drop current buffer so we can keep moving
-	    		// instead of being stopped at that point for ever
-	    		if ( bytesUsed == 0 && bufferRead ==  MAX_READ_SIZE) {
-	    		  log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, droping current buffer: startOffset=" 
-	    		      + curOffset + ", MAX_READ_SIZE=" + MAX_READ_SIZE + ", for " + toWatch.getPath());
-	    		  bytesUsed = buf.length;
-	    		}
-
-	    		fileReadOffset = fileReadOffset + bytesUsed;
-	    		
-	    		
-	    		log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"+ fileReadOffset);
-	    		
-	    		
-	    	} else {
-	    	  // file has rotated and no detection
-	    	  reader.close();
-	    	  reader=null;
-	    	  fileReadOffset = 0L;
-	    	  offsetOfFirstByte = 0L;
-	    	  hasMoreData = true;
-	    	  log.warn("Adaptor|" + adaptorID +"| file: " + toWatch.getPath() +", has rotated and no detection - reset counters to 0L");	    	
-	    	}
-	    } catch (IOException e) {
-	    	log.warn("failure reading " + toWatch, e);
-	    }
-	    attempts=0;
-	    adaptorInError = false;
-	    return hasMoreData;
-	}
-	
+    try {
+      if ((adaptorInError == true)
+          && (System.currentTimeMillis() > gracefulPeriodExpired)) {
+        if (!toWatch.exists()) {
+          log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
+              + "| File does not exist: " + toWatch.getAbsolutePath()
+              + ", streaming policy expired.  File removed from streaming.");
+        } else if (!toWatch.canRead()) {
+          log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
+              + "| File cannot be read: " + toWatch.getAbsolutePath()
+              + ", streaming policy expired.  File removed from streaming.");
+        } else {
+          // Should have never been there
+          adaptorInError = false;
+          gracefulPeriodExpired = 0L;
+          attempts = 0;
+          return false;
+        }
+
+        ChukwaAgent agent = ChukwaAgent.getAgent();
+        if (agent != null) {
+          agent.stopAdaptor(adaptorID, false);
+        } else {
+          log.info("Agent is null, running in default mode");
+        }
+        return false;
+
+      } else if (!toWatch.exists() || !toWatch.canRead()) {
+        if (adaptorInError == false) {
+          long now = System.currentTimeMillis();
+          gracefulPeriodExpired = now + GRACEFUL_PERIOD;
+          adaptorInError = true;
+          attempts = 0;
+          log.warn("failed to stream data for: " + toWatch.getAbsolutePath()
+              + ", graceful period will Expire at now:" + now + " + "
+              + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
+        } else if (attempts % 10 == 0) {
+          log.info("failed to stream data for: " + toWatch.getAbsolutePath()
+              + ", attempt: " + attempts);
+        }
+
+        attempts++;
+        return false; // no more data
+      }
+
+      if (reader == null) {
+        reader = new RandomAccessFile(toWatch, "r");
+        log.info("Adaptor|" + adaptorID
+            + "|Opening the file for the first time|seek|" + fileReadOffset);
+      }
+
+      long len = 0L;
+      try {
+        RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
+        len = reader.length();
+        long newLength = newReader.length();
+        if (newLength < len && fileReadOffset >= len) {
+          reader.close();
+          reader = newReader;
+          fileReadOffset = 0L;
+          log.debug("Adaptor|" + adaptorID
+              + "| File size mismatched, rotating: "
+              + toWatch.getAbsolutePath());
+        } else {
+          try {
+            newReader.close();
+          } catch (IOException e) {
+            // do nothing.
+          }
+        }
+      } catch (IOException e) {
+        // do nothing, if file doesn't exist.
+      }
+      if (len >= fileReadOffset) {
+        if (offsetOfFirstByte > fileReadOffset) {
+          // If the file rotated, the recorded offsetOfFirstByte is greater than
+          // file size,
+          // reset the first byte position to beginning of the file.
+          fileReadOffset = 0;
+          offsetOfFirstByte = 0L;
+          log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
+        }
+
+        log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
+        reader.seek(fileReadOffset);
+
+        long bufSize = len - fileReadOffset;
+
+        if (conf == null) {
+          ChukwaAgent agent = ChukwaAgent.getAgent();
+          if (agent != null) {
+            conf = agent.getConfiguration();
+            if (conf != null) {
+              MAX_READ_SIZE = conf.getInt(
+                  "chukwaAgent.fileTailingAdaptor.maxReadSize",
+                  DEFAULT_MAX_READ_SIZE);
+              log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: "
+                  + MAX_READ_SIZE);
+            } else {
+              log.info("Conf is null, running in default mode");
+            }
+          } else {
+            log.info("Agent is null, running in default mode");
+          }
+        }
+
+        if (bufSize > MAX_READ_SIZE) {
+          bufSize = MAX_READ_SIZE;
+          hasMoreData = true;
+        }
+        byte[] buf = new byte[(int) bufSize];
+
+        long curOffset = fileReadOffset;
+
+        int bufferRead = reader.read(buf);
+        assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
+            + " pointer is "
+            + reader.getFilePointer()
+            + " but offset is "
+            + fileReadOffset + bufSize;
+
+        int bytesUsed = extractRecords(dest,
+            fileReadOffset + offsetOfFirstByte, buf);
+
+        // === WARNING ===
+        // If we couldn't found a complete record AND
+        // we cannot read more, i.e bufferRead == MAX_READ_SIZE
+        // it's because the record is too BIG
+        // So log.warn, and drop current buffer so we can keep moving
+        // instead of being stopped at that point for ever
+        if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
+          log
+              .warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, droping current buffer: startOffset="
+                  + curOffset
+                  + ", MAX_READ_SIZE="
+                  + MAX_READ_SIZE
+                  + ", for "
+                  + toWatch.getPath());
+          bytesUsed = buf.length;
+        }
+
+        fileReadOffset = fileReadOffset + bytesUsed;
+
+        log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"
+            + fileReadOffset);
+
+      } else {
+        // file has rotated and no detection
+        reader.close();
+        reader = null;
+        fileReadOffset = 0L;
+        offsetOfFirstByte = 0L;
+        hasMoreData = true;
+        log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
+            + ", has rotated and no detection - reset counters to 0L");
+      }
+    } catch (IOException e) {
+      log.warn("failure reading " + toWatch, e);
+    }
+    attempts = 0;
+    adaptorInError = false;
+    return hasMoreData;
+  }
+
   /**
    * Extract records from a byte sequence
+   * 
    * @param eq the queue to stick the new chunk[s] in
    * @param buffOffsetInFile the byte offset in the stream at which buf[] begins
    * @param buf the byte buffer to extract records from
    * @return the number of bytes processed
    * @throws InterruptedException
    */
-  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
-      throws InterruptedException
-  {
-    ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(), buffOffsetInFile + buf.length,
-        buf, this);
+  protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+      byte[] buf) throws InterruptedException {
+    ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
+        buffOffsetInFile + buf.length, buf, this);
 
     eq.add(chunk);
     return buf.length;
@@ -336,5 +351,4 @@
     return type;
   }
 
-
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java Wed Mar 11 22:39:26 2009
@@ -1,47 +1,57 @@
 package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
 
+
 import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
 import org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor;
 import org.apache.log4j.Logger;
 
 public class TerminatorThread extends Thread {
-	private static Logger log =Logger.getLogger(FileAdaptor.class);
+  private static Logger log = Logger.getLogger(FileAdaptor.class);
+
+  private FileTailingAdaptor adaptor = null;
+  private ChunkReceiver eq = null;
 
-	private FileTailingAdaptor adaptor = null;
-	private ChunkReceiver eq = null;
-	
-	public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
-		this.adaptor = adaptor;
-		this.eq = eq;
-	}
+  public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
+    this.adaptor = adaptor;
+    this.eq = eq;
+  }
 
   public void run() {
-    
-    long endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
+
+    long endTime = System.currentTimeMillis() + (10 * 60 * 1000); // now + 10
+                                                                  // mins
     int count = 0;
     log.info("Terminator thread started." + adaptor.toWatch.getPath());
     try {
       while (adaptor.tailFile(eq)) {
         if (log.isDebugEnabled()) {
-          log.debug("Terminator thread:" + adaptor.toWatch.getPath() + " still working");
+          log.debug("Terminator thread:" + adaptor.toWatch.getPath()
+              + " still working");
         }
         long now = System.currentTimeMillis();
-        if (now > endTime ) {
-          log.warn("TerminatorThread should have been finished by now! count=" + count);
-          count ++;
-          endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
-          if (count >3 ) {
-            log.warn("TerminatorThread should have been finished by now, stopping it now! count=" + count);
+        if (now > endTime) {
+          log.warn("TerminatorThread should have been finished by now! count="
+              + count);
+          count++;
+          endTime = System.currentTimeMillis() + (10 * 60 * 1000); // now + 10
+                                                                   // mins
+          if (count > 3) {
+            log
+                .warn("TerminatorThread should have been finished by now, stopping it now! count="
+                    + count);
             break;
           }
         }
       }
     } catch (InterruptedException e) {
-      log.info("InterruptedException on Terminator thread:" + adaptor.toWatch.getPath(),e);
+      log.info("InterruptedException on Terminator thread:"
+          + adaptor.toWatch.getPath(), e);
     } catch (Throwable e) {
-      log.warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(),e);
+      log
+          .warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(),
+              e);
     }
-    
+
     log.info("Terminator thread finished." + adaptor.toWatch.getPath());
     try {
       adaptor.reader.close();

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.agent;
 
+
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.log4j.Logger;
 
@@ -26,47 +27,49 @@
  * 
  */
 public class AdaptorFactory {
-   
+
   static Logger log = Logger.getLogger(ChukwaAgent.class);
-    /**
-     * Instantiate an adaptor that can be added by the {@link ChukwaAgent}
-     * @param className the name of the {@link Adaptor} class to instantiate
-     * @return an Adaptor of the specified type
-     */
-    static Adaptor createAdaptor(String className){
+
+  /**
+   * Instantiate an adaptor that can be added by the {@link ChukwaAgent}
+   * 
+   * @param className the name of the {@link Adaptor} class to instantiate
+   * @return an Adaptor of the specified type
+   */
+  static Adaptor createAdaptor(String className) {
     Object obj = null;
-    try{
-      //the following reflection business for type checking is probably unnecessary
-      //since it will just throw a ClassCastException on error anyway.
+    try {
+      // the following reflection business for type checking is probably
+      // unnecessary
+      // since it will just throw a ClassCastException on error anyway.
       obj = Class.forName(className).newInstance();
-      if (Adaptor.class.isInstance(obj)){
+      if (Adaptor.class.isInstance(obj)) {
         return (Adaptor) obj;
-      } 
-      else        
+      } else
         return null;
-    } catch (Exception e1){
-      log.warn("Error instantiating new adaptor by class name, " +
-      		"attempting again, but with default chukwa package prepended, i.e. " +
-      		"org.apache.hadoop.chukwa.datacollection.adaptor." + className + ". " 
-      		+ e1);
-      try{
-        //if failed, try adding default class prefix
+    } catch (Exception e1) {
+      log
+          .warn("Error instantiating new adaptor by class name, "
+              + "attempting again, but with default chukwa package prepended, i.e. "
+              + "org.apache.hadoop.chukwa.datacollection.adaptor." + className
+              + ". " + e1);
+      try {
+        // if failed, try adding default class prefix
         Object obj2 = Class.forName(
-            "org.apache.hadoop.chukwa.datacollection.adaptor." +
-            className).newInstance();
-        if (Adaptor.class.isInstance(obj2)){
-          log.debug("Succeeded in finding class by adding default chukwa " +
-              "namespace prefix to class name profided");
+            "org.apache.hadoop.chukwa.datacollection.adaptor." + className)
+            .newInstance();
+        if (Adaptor.class.isInstance(obj2)) {
+          log.debug("Succeeded in finding class by adding default chukwa "
+              + "namespace prefix to class name profided");
           return (Adaptor) obj2;
-        } 
-        else        
+        } else
           return null;
       } catch (Exception e2) {
-        System.out.println("Also error instantiating new adaptor by classname" +
-        		"with prefix added" + e2);
+        System.out.println("Also error instantiating new adaptor by classname"
+            + "with prefix added" + e2);
         return null;
       }
     }
   }
-  
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.agent;
 
+
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -28,7 +29,6 @@
 import java.net.Socket;
 import java.net.SocketException;
 import java.util.Map;
-
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
 import org.apache.log4j.Logger;
@@ -39,8 +39,8 @@
  * programmatically and via telnet.
  * 
  * The port to bind to can be specified by setting option
- * chukwaAgent.agent.control.port.
- * A port of 0 creates a socket on any free port.
+ * chukwaAgent.agent.control.port. A port of 0 creates a socket on any free
+ * port.
  */
 public class AgentControlSocketListener extends Thread {
 
@@ -63,7 +63,8 @@
       try {
         InputStream in = connection.getInputStream();
         BufferedReader br = new BufferedReader(new InputStreamReader(in));
-        PrintStream out = new PrintStream(new BufferedOutputStream(connection.getOutputStream()));
+        PrintStream out = new PrintStream(new BufferedOutputStream(connection
+            .getOutputStream()));
         String cmd = null;
         while ((cmd = br.readLine()) != null) {
           processCommand(cmd, out);
@@ -89,11 +90,13 @@
     public void processCommand(String cmd, PrintStream out) throws IOException {
       String[] words = cmd.split("\\s+");
       if (log.isDebugEnabled()) {
-        log.debug("command from " + connection.getRemoteSocketAddress() + ":"+ cmd);
+        log.debug("command from " + connection.getRemoteSocketAddress() + ":"
+            + cmd);
       }
 
       if (words[0].equalsIgnoreCase("help")) {
-        out.println("you're talking to the Chukwa agent.  Commands available: ");
+        out
+            .println("you're talking to the Chukwa agent.  Commands available: ");
         out.println("add [adaptorname] [args] [offset] -- start an adaptor");
         out.println("shutdown [adaptornumber]  -- graceful stop");
         out.println("stop [adaptornumber]  -- abrupt stop");
@@ -177,7 +180,8 @@
 
     this.setDaemon(false); // to keep the local agent alive
     this.agent = agent;
-    this.portno = agent.getConfiguration().getInt("chukwaAgent.control.port", 9093);
+    this.portno = agent.getConfiguration().getInt("chukwaAgent.control.port",
+        9093);
     log.info("AgentControlSocketListerner ask for port: " + portno);
     this.setName("control socket listener");
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.agent;
 
+
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
 import org.apache.hadoop.chukwa.datacollection.connector.*;
@@ -27,7 +28,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
-
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
@@ -39,16 +39,14 @@
  * be embeddable, for use in testing.
  * 
  */
-public class ChukwaAgent
-{
-  //boolean WRITE_CHECKPOINTS = true;
+public class ChukwaAgent {
+  // boolean WRITE_CHECKPOINTS = true;
 
   static Logger log = Logger.getLogger(ChukwaAgent.class);
   static ChukwaAgent agent = null;
   private static PidFile pFile = null;
 
-  public static ChukwaAgent getAgent()
-  {
+  public static ChukwaAgent getAgent() {
     return agent;
   }
 
@@ -56,10 +54,8 @@
   Connector connector = null;
 
   // doesn't need an equals(), comparator, etc
-  private static class Offset
-  {
-    public Offset(long l, long id)
-    {
+  private static class Offset {
+    public Offset(long l, long id) {
       offset = l;
       this.id = id;
     }
@@ -68,13 +64,11 @@
     private volatile long offset;
   }
 
-  public static class AlreadyRunningException extends Exception
-  {
+  public static class AlreadyRunningException extends Exception {
 
     private static final long serialVersionUID = 1L;
 
-    public AlreadyRunningException()
-    {
+    public AlreadyRunningException() {
       super("Agent already running; aborting");
     }
   }
@@ -105,41 +99,37 @@
   public int getControllerPort() {
     return controlSock.getPort();
   }
-  
+
   /**
    * @param args
    * @throws AdaptorException
    */
-  public static void main(String[] args) throws AdaptorException
-  {
+  public static void main(String[] args) throws AdaptorException {
 
     pFile = new PidFile("Agent");
     Runtime.getRuntime().addShutdownHook(pFile);
 
-    try
-    {
+    try {
       if (args.length > 0 && args[0].equals("-help")) {
-        System.out.println("usage:  LocalAgent [-noCheckPoint]" +
-            "[default collector URL]");
+        System.out.println("usage:  LocalAgent [-noCheckPoint]"
+            + "[default collector URL]");
         System.exit(0);
       }
       Configuration conf = readConfig();
       ChukwaAgent localAgent = new ChukwaAgent(conf);
 
-      if (agent.anotherAgentIsRunning())
-      {
+      if (agent.anotherAgentIsRunning()) {
         System.out
-            .println("another agent is running (or port has been usurped). " +
-            		"Bailing out now");
+            .println("another agent is running (or port has been usurped). "
+                + "Bailing out now");
         System.exit(-1);
       }
 
       int uriArgNumber = 0;
-      if (args.length > 0)
-      {
+      if (args.length > 0) {
         if (args[uriArgNumber].equals("local"))
           agent.connector = new ConsoleOutConnector(agent);
-        else  {
+        else {
           if (!args[uriArgNumber].contains("://"))
             args[uriArgNumber] = "http://" + args[uriArgNumber];
           agent.connector = new HttpConnector(agent, args[uriArgNumber]);
@@ -151,40 +141,34 @@
 
       log.info("local agent started on port " + agent.getControlSock().portno);
 
-    } catch (AlreadyRunningException e)
-    {
-      log
-          .error("agent started already on this machine with same portno;" +
-          		" bailing out");
+    } catch (AlreadyRunningException e) {
+      log.error("agent started already on this machine with same portno;"
+          + " bailing out");
       System.out
-          .println("agent started already on this machine with same portno;" +
-          		" bailing out");
+          .println("agent started already on this machine with same portno;"
+              + " bailing out");
       System.exit(0); // better safe than sorry
-    } catch (Exception e)
-    {
+    } catch (Exception e) {
       e.printStackTrace();
     }
   }
 
-  private boolean anotherAgentIsRunning()
-  {
+  private boolean anotherAgentIsRunning() {
     return !controlSock.isBound();
   }
 
   /**
    * @return the number of running adaptors inside this local agent
    */
-  public int adaptorCount()
-  {
+  public int adaptorCount() {
     return adaptorsByNumber.size();
   }
-  
 
   public ChukwaAgent() throws AlreadyRunningException {
     this(new Configuration());
   }
 
-  public ChukwaAgent(Configuration conf) throws AlreadyRunningException  {
+  public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
     ChukwaAgent.agent = this;
     this.conf = conf;
 
@@ -193,20 +177,20 @@
     adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
     adaptorsByNumber = new HashMap<Long, Adaptor>();
     checkpointNumber = 0;
-    
-    boolean DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled",
-        true);
+
+    boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
+        "chukwaAgent.checkpoint.enabled", true);
     CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
         "chukwa_checkpoint_");
-    final int CHECKPOINT_INTERVAL_MS = conf.getInt("chukwaAgent.checkpoint.interval",
-        5000);
-    
-    if(conf.get("chukwaAgent.checkpoint.dir") != null)
+    final int CHECKPOINT_INTERVAL_MS = conf.getInt(
+        "chukwaAgent.checkpoint.interval", 5000);
+
+    if (conf.get("chukwaAgent.checkpoint.dir") != null)
       checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
     else
       DO_CHECKPOINT_RESTORE = false;
-    
-    if (checkpointDir!= null && !checkpointDir.exists())  {
+
+    if (checkpointDir != null && !checkpointDir.exists()) {
       checkpointDir.mkdirs();
     }
     tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
@@ -221,11 +205,11 @@
     if (DO_CHECKPOINT_RESTORE) {
       log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
     }
-    
+
     File initialAdaptors = null;
-    if(conf.get("chukwaAgent.initial_adaptors") != null)
-      initialAdaptors= new File( conf.get("chukwaAgent.initial_adaptors"));
-    
+    if (conf.get("chukwaAgent.initial_adaptors") != null)
+      initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
+
     try {
       if (DO_CHECKPOINT_RESTORE) {
         restoreFromCheckpoint();
@@ -235,35 +219,35 @@
     }
 
     try {
-      if (initialAdaptors != null && initialAdaptors.exists() && checkpointNumber ==0)
-        readAdaptorsFile(initialAdaptors); //don't read after checkpoint restore
+      if (initialAdaptors != null && initialAdaptors.exists()
+          && checkpointNumber == 0)
+        readAdaptorsFile(initialAdaptors); // don't read after checkpoint
+                                           // restore
     } catch (IOException e) {
       log.warn("couldn't read user-specified file "
           + initialAdaptors.getAbsolutePath());
     }
 
     controlSock = new AgentControlSocketListener(this);
-    try
-    {
+    try {
       controlSock.tryToBind(); // do this synchronously; if it fails, we know
       // another agent is running.
       controlSock.start(); // this sets us up as a daemon
       log.info("control socket started on port " + controlSock.portno);
 
-      
-        //shouldn't start checkpointing until we're finishing launching
-      //adaptors on boot
-      if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir!= null)  {
+      // shouldn't start checkpointing until we're finishing launching
+      // adaptors on boot
+      if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
         checkpointer = new Timer();
         checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
       }
-    } catch (IOException e)
-    {
+    } catch (IOException e) {
       log.info("failed to bind to socket; aborting agent launch", e);
       throw new AlreadyRunningException();
     }
 
   }
+
   // words should contain (space delimited):
   // 0) command ("add")
   // 1) AdaptorClassname
@@ -273,35 +257,37 @@
   // but can be arbitrarily many space
   // delimited agent specific params )
   // 4) offset
-  Pattern addCmdPattern = Pattern.compile(
-      "[aA][dD][dD]\\s+"  //command "add", any case, plus at least one space
-      + "(\\S+)\\s+" //the adaptor classname, plus at least one space
-      + "(\\S+)\\s+" //datatype, plus at least one space
-      + "(?:"    //start a non-capturing group, for the parameters
-      +         "(.*?)\\s+"    //capture the actual parameters reluctantly, followed by whitespace
-      +       ")?"    //end non-matching group for params; group is optional
-      + "(\\d+)\\s*");  // finally, an offset and some trailing whitespace
+  Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+" // command "add",
+                                                             // any case, plus
+                                                             // at least one
+                                                             // space
+      + "(\\S+)\\s+" // the adaptor classname, plus at least one space
+      + "(\\S+)\\s+" // datatype, plus at least one space
+      + "(?:" // start a non-capturing group, for the parameters
+      + "(.*?)\\s+" // capture the actual parameters reluctantly, followed by
+                    // whitespace
+      + ")?" // end non-matching group for params; group is optional
+      + "(\\d+)\\s*"); // finally, an offset and some trailing whitespace
 
-  public long processCommand(String cmd)
-  {
+  public long processCommand(String cmd) {
     Matcher m = addCmdPattern.matcher(cmd);
     if (m.matches()) {
-      long offset;  //check for obvious errors first
+      long offset; // check for obvious errors first
       try {
         offset = Long.parseLong(m.group(4));
-      } catch (NumberFormatException e)  {
+      } catch (NumberFormatException e) {
         log.warn("malformed line " + cmd);
         return -1L;
       }
-      
+
       String adaptorName = m.group(1);
       String dataType = m.group(2);
       String params = m.group(3);
-      if(params == null)
+      if (params == null)
         params = "";
 
       Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
-      if (adaptor == null)  {
+      if (adaptor == null) {
         log.warn("Error creating adaptor from adaptor name " + adaptorName);
         return -1L;
       }
@@ -318,24 +304,21 @@
         adaptorsByNumber.put(adaptorID, adaptor);
         adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
         needNewCheckpoint = true;
-        try
-        {
+        try {
           adaptor.start(adaptorID, dataType, params, offset, DataFactory
               .getInstance().getEventQueue());
           log.info("started a new adaptor, id = " + adaptorID);
           return adaptorID;
 
-        } catch (Exception e)
-        {
+        } catch (Exception e) {
           log.warn("failed to start adaptor", e);
           // FIXME: don't we need to clean up the adaptor maps here?
         }
       }
-    } else
-      if(cmd.length() > 0)
-        log.warn("only 'add' command supported in config files; cmd was: "+ cmd);
-      //no warning for blank line
-    
+    } else if (cmd.length() > 0)
+      log.warn("only 'add' command supported in config files; cmd was: " + cmd);
+    // no warning for blank line
+
     return -1;
   }
 
@@ -351,24 +334,19 @@
    * @return true if the restore succeeded
    * @throws IOException
    */
-  public boolean restoreFromCheckpoint() throws IOException
-  {
-    synchronized (checkpointDir)
-    {
-      String[] checkpointNames = checkpointDir.list(new FilenameFilter()
-      {
-        public boolean accept(File dir, String name)
-        {
+  public boolean restoreFromCheckpoint() throws IOException {
+    synchronized (checkpointDir) {
+      String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
+        public boolean accept(File dir, String name) {
           return name.startsWith(CHECKPOINT_BASE_NAME);
         }
       });
-      
+
       if (checkpointNames == null) {
         log.error("Unable to list files in checkpoint dir");
         return false;
       }
-      if (checkpointNames.length == 0)
-      {
+      if (checkpointNames.length == 0) {
         log.info("No checkpoints found in " + checkpointDir);
         return false;
       }
@@ -381,18 +359,16 @@
 
       String lowestName = null;
       int lowestIndex = Integer.MAX_VALUE;
-      for (String n : checkpointNames)
-      {
+      for (String n : checkpointNames) {
         int index = Integer
             .parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
-        if (index < lowestIndex)
-        {
+        if (index < lowestIndex) {
           lowestName = n;
           lowestIndex = index;
         }
       }
 
-      checkpointNumber = lowestIndex+1;
+      checkpointNumber = lowestIndex + 1;
       File checkpoint = new File(checkpointDir, lowestName);
       readAdaptorsFile(checkpoint);
     }
@@ -400,8 +376,7 @@
   }
 
   private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
-      IOException
-  {
+      IOException {
     BufferedReader br = new BufferedReader(new InputStreamReader(
         new FileInputStream(checkpoint)));
     String cmd = null;
@@ -448,17 +423,15 @@
   public void reportCommit(Adaptor src, long uuid) {
     needNewCheckpoint = true;
     Offset o = adaptorPositions.get(src);
-    if (o != null)
-    {
-      synchronized (o)
-      { // order writes to offset, in case commits are processed out of order
+    if (o != null) {
+      synchronized (o) { // order writes to offset, in case commits are
+                         // processed out of order
         if (uuid > o.offset)
           o.offset = uuid;
       }
 
       log.info("got commit up to " + uuid + " on " + src + " = " + o.id);
-    } else
-    {
+    } else {
       log.warn("got commit up to " + uuid + "  for adaptor " + src
           + " that doesn't appear to be running: " + adaptorsByNumber.size()
           + " total");
@@ -466,16 +439,12 @@
   }
 
   class CheckpointTask extends TimerTask {
-    public void run()
-    {
-      try
-      {
-        if (needNewCheckpoint)
-        {
+    public void run() {
+      try {
+        if (needNewCheckpoint) {
           writeCheckpoint();
         }
-      } catch (IOException e)
-      {
+      } catch (IOException e) {
         log.warn("failed to write checkpoint", e);
       }
     }
@@ -494,10 +463,8 @@
    * If the adaptor is written correctly, its offset won't change after
    * returning from shutdown.
    * 
-   * @param number
-   *          the adaptor to stop
-   * @param gracefully
-   *          if true, shutdown, if false, hardStop
+   * @param number the adaptor to stop
+   * @param gracefully if true, shutdown, if false, hardStop
    * @return the number of bytes synched at stop. -1 on error
    */
   public long stopAdaptor(long number, boolean gracefully) {
@@ -516,19 +483,21 @@
     } else {
       adaptorPositions.remove(toStop);
     }
-    
-    try {    	      
+
+    try {
       if (gracefully) {
-   	    offset = toStop.shutdown(); 
-   	   log.info("shutdown on adaptor: " + number + ", " + toStop.getCurrentStatus());
-      } else { 
+        offset = toStop.shutdown();
+        log.info("shutdown on adaptor: " + number + ", "
+            + toStop.getCurrentStatus());
+      } else {
         toStop.hardStop();
-        log.info("hardStop on adaptorId: " + number + ", " + toStop.getCurrentStatus());
+        log.info("hardStop on adaptorId: " + number + ", "
+            + toStop.getCurrentStatus());
       }
     } catch (AdaptorException e) {
       log.error("adaptor failed to stop cleanly", e);
     } finally {
-    	  needNewCheckpoint = true;
+      needNewCheckpoint = true;
     }
     return offset;
   }
@@ -558,13 +527,15 @@
       chukwaConf = new File(chukwaConfName).getAbsoluteFile();
     else
       chukwaConf = new File(chukwaHome, "conf");
-    
+
     log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
-    File agentConf = new File(chukwaConf,"chukwa-agent-conf.xml");
+    File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
     conf.addResource(new Path(agentConf.getAbsolutePath()));
-    if(conf.get("chukwaAgent.checkpoint.dir") == null)
-      conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var").getAbsolutePath());
-    conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf, "initial_adaptors").getAbsolutePath());
+    if (conf.get("chukwaAgent.checkpoint.dir") == null)
+      conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var")
+          .getAbsolutePath());
+    conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
+        "initial_adaptors").getAbsolutePath());
     return conf;
   }
 
@@ -581,20 +552,19 @@
     if (checkpointer != null) {
       checkpointer.cancel();
       try {
-        if(needNewCheckpoint)
+        if (needNewCheckpoint)
           writeCheckpoint(); // write a last checkpoint here, before stopping
       } catch (IOException e) {
       }
     }
-      // adaptors
+    // adaptors
 
-    synchronized (adaptorsByNumber) { 
+    synchronized (adaptorsByNumber) {
       // shut down each adaptor
       for (Adaptor a : adaptorsByNumber.values()) {
         try {
           a.hardStop();
-        } catch (AdaptorException e)
-        {
+        } catch (AdaptorException e) {
           log.warn("failed to cleanly stop " + a, e);
         }
       }
@@ -608,8 +578,7 @@
   /**
    * Returns the last offset at which a given adaptor was checkpointed
    * 
-   * @param a
-   *          the adaptor in question
+   * @param a the adaptor in question
    * @return that adaptor's last-checkpointed offset
    */
   public long getOffset(Adaptor a) {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Wed Mar 11 22:39:26 2009
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.chukwa.datacollection.agent;
 
+
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
 import org.apache.log4j.Logger;
@@ -29,75 +29,71 @@
 /**
  * An event queue that blocks once a fixed upper limit of data is enqueued.
  * 
- * For now, uses the size of the data field.  Should really use estimatedSerializedSize()?
+ * For now, uses the size of the data field. Should really use
+ * estimatedSerializedSize()?
  * 
  */
-public class MemLimitQueue implements ChunkQueue
-{
-	static Logger log = Logger.getLogger(WaitingQueue.class);
-	
-	private Queue<Chunk> queue = new LinkedList<Chunk>();
-	private long dataSize = 0;
-	private final long MAX_MEM_USAGE;
+public class MemLimitQueue implements ChunkQueue {
+  static Logger log = Logger.getLogger(WaitingQueue.class);
 
+  private Queue<Chunk> queue = new LinkedList<Chunk>();
+  private long dataSize = 0;
+  private final long MAX_MEM_USAGE;
 
-	
   public MemLimitQueue(int limit) {
     MAX_MEM_USAGE = limit;
   }
-	
-	/**
-	 * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
-	 */
-	public void add(Chunk chunk) throws InterruptedException
-	{
-	  assert chunk != null: "can't enqueue null chunks";
-    synchronized(this) {
-      while(chunk.getData().length  + dataSize > MAX_MEM_USAGE)
-      {
-    	  try 
-    	  { 
-    		  this.wait();
-    		  log.info("MemLimitQueue is full [" + dataSize +"]");
-    	  }
-    	  catch(InterruptedException e) {}
+
+  /**
+   * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
+   */
+  public void add(Chunk chunk) throws InterruptedException {
+    assert chunk != null : "can't enqueue null chunks";
+    synchronized (this) {
+      while (chunk.getData().length + dataSize > MAX_MEM_USAGE) {
+        try {
+          this.wait();
+          log.info("MemLimitQueue is full [" + dataSize + "]");
+        } catch (InterruptedException e) {
+        }
       }
       dataSize += chunk.getData().length;
       queue.add(chunk);
       this.notifyAll();
     }
-	 
-	}
 
-	/**
-	 * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List, int)
-	 */
-	public void collect(List<Chunk> events,int maxSize) throws InterruptedException
-	{
-		synchronized(this) {
-		  //we can't just say queue.take() here, since we're holding a lock.
-		  while(queue.isEmpty()){
-		    this.wait();
-		  }
-		  
-		  
-		  int size = 0;
-		  while(!queue.isEmpty() && (size < maxSize)) { 
-		    Chunk e = this.queue.remove();
-		    int chunkSize = e.getData().length;
-		    size += chunkSize;
-		    dataSize -= chunkSize;
-		    events.add(e);
-		  }
-		  this.notifyAll();
-		} 
-
-		if (log.isDebugEnabled()) 	{
-			log.debug("WaitingQueue.inQueueCount:" + queue.size() + "\tWaitingQueue.collectCount:" + events.size());
-		}
-	}
-
-	public int size(){
-	  return queue.size();
-	}
+  }
+
+  /**
+   * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List,
+   *      int)
+   */
+  public void collect(List<Chunk> events, int maxSize)
+      throws InterruptedException {
+    synchronized (this) {
+      // we can't just say queue.take() here, since we're holding a lock.
+      while (queue.isEmpty()) {
+        this.wait();
+      }
+
+      int size = 0;
+      while (!queue.isEmpty() && (size < maxSize)) {
+        Chunk e = this.queue.remove();
+        int chunkSize = e.getData().length;
+        size += chunkSize;
+        dataSize -= chunkSize;
+        events.add(e);
+      }
+      this.notifyAll();
+    }
+
+    if (log.isDebugEnabled()) {
+      log.debug("WaitingQueue.inQueueCount:" + queue.size()
+          + "\tWaitingQueue.collectCount:" + events.size());
+    }
+  }
+
+  public int size() {
+    return queue.size();
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java Wed Mar 11 22:39:26 2009
@@ -18,57 +18,50 @@
 
 package org.apache.hadoop.chukwa.datacollection.agent;
 
+
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
 import org.apache.log4j.Logger;
 
-public class WaitingQueue implements ChunkQueue
-{
+public class WaitingQueue implements ChunkQueue {
+
+  static Logger log = Logger.getLogger(WaitingQueue.class);
+  private BlockingQueue<Chunk> queue = new LinkedBlockingQueue<Chunk>(5);
 
-	static Logger log = Logger.getLogger(WaitingQueue.class);
-	private BlockingQueue<Chunk> queue = new LinkedBlockingQueue<Chunk>(5);
-	
-	public void add(Chunk event)
-	{
-	  try
-	  {
-		this.queue.put(event);
-	  }
-	  catch(InterruptedException e)
-	  {}//return upwards
-	}
-
-	public void add(List<Chunk> events)
-	{
-		this.queue.addAll(events);
-  
-	}
-
-	public void collect(List<Chunk> events,int maxCount)
-	{
-		// Workaround to block on the queue
-		try
-		{
-			events.add(this.queue.take());
-		} 
-		catch (InterruptedException e)
-		{}
-		this.queue.drainTo(events,maxCount-1);
-
-		System.out.println("collect [" + Thread.currentThread().getName() + "] [" + events.size() + "]");
-
-		if (log.isDebugEnabled())
-		{
-			log.debug("WaitingQueue.inQueueCount:" + queue.size() + "\tWaitingQueue.collectCount:" + events.size());
-		}
-	}
-	
-	 public int size(){
-	    return queue.size();
-	  }
+  public void add(Chunk event) {
+    try {
+      this.queue.put(event);
+    } catch (InterruptedException e) {
+    }// return upwards
+  }
+
+  public void add(List<Chunk> events) {
+    this.queue.addAll(events);
+
+  }
+
+  public void collect(List<Chunk> events, int maxCount) {
+    // Workaround to block on the queue
+    try {
+      events.add(this.queue.take());
+    } catch (InterruptedException e) {
+    }
+    this.queue.drainTo(events, maxCount - 1);
+
+    System.out.println("collect [" + Thread.currentThread().getName() + "] ["
+        + events.size() + "]");
+
+    if (log.isDebugEnabled()) {
+      log.debug("WaitingQueue.inQueueCount:" + queue.size()
+          + "\tWaitingQueue.collectCount:" + events.size());
+    }
+  }
+
+  public int size() {
+    return queue.size();
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.collector;
 
+
 import org.mortbay.jetty.*;
 import org.mortbay.jetty.nio.SelectChannelConnector;
 import org.mortbay.jetty.servlet.*;
@@ -27,74 +28,73 @@
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 
 public class CollectorStub {
-  
+
   static int THREADS = 80;
   private static PidFile pFile = null;
   public static Server jettyServer = null;
+
   public static void main(String[] args) {
-	
-    pFile=new PidFile("Collector");
-    Runtime.getRuntime().addShutdownHook(pFile); 	 	  
+
+    pFile = new PidFile("Collector");
+    Runtime.getRuntime().addShutdownHook(pFile);
     try {
-      if(args.length>=1 && args[0].equalsIgnoreCase("-help")) {
+      if (args.length >= 1 && args[0].equalsIgnoreCase("-help")) {
         System.out.println("usage:  CollectorStub [portno] [pretend]");
-        System.out.println("note: if no portno defined, " +
-     	                     "defaults to value in chukwa-site.xml");
+        System.out.println("note: if no portno defined, "
+            + "defaults to value in chukwa-site.xml");
         System.exit(0);
       }
- 
+
       ChukwaConfiguration conf = new ChukwaConfiguration();
       int portNum = conf.getInt("chukwaCollector.http.port", 9999);
       THREADS = conf.getInt("chukwaCollector.http.threads", 80);
-      
-      if(args.length != 0)
+
+      if (args.length != 0)
         portNum = Integer.parseInt(args[0]);
-      
-        //pick a writer.
+
+      // pick a writer.
       ChukwaWriter w = null;
-      if(args.length > 1) {
-        if(args[1].equals("pretend"))
-          w= new ConsoleWriter(true);
-        else if(args[1].equals("pretend-quietly"))
+      if (args.length > 1) {
+        if (args[1].equals("pretend"))
+          w = new ConsoleWriter(true);
+        else if (args[1].equals("pretend-quietly"))
           w = new ConsoleWriter(false);
-        else if(args[1].equals("-classname")) {
-          if(args.length < 3)
+        else if (args[1].equals("-classname")) {
+          if (args.length < 3)
             System.err.println("need to specify a writer class");
           else {
             conf.set("chukwaCollector.writerClass", args[2]);
           }
-        }
-        else
-          System.out.println("WARNING: unknown command line arg "+ args[1]);
+        } else
+          System.out.println("WARNING: unknown command line arg " + args[1]);
       }
-      if(w != null) {
+      if (w != null) {
         w.init(conf);
         ServletCollector.setWriter(w);
       }
-      
-        //set up jetty connector
+
+      // set up jetty connector
       SelectChannelConnector jettyConnector = new SelectChannelConnector();
-      jettyConnector.setLowResourcesConnections(THREADS-10);
+      jettyConnector.setLowResourcesConnections(THREADS - 10);
       jettyConnector.setLowResourceMaxIdleTime(1500);
       jettyConnector.setPort(portNum);
-        //set up jetty server
+      // set up jetty server
       jettyServer = new Server(portNum);
-      
-      jettyServer.setConnectors(new Connector[]{ jettyConnector});
-      org.mortbay.thread.BoundedThreadPool pool = 
-        new org.mortbay.thread.BoundedThreadPool();
+
+      jettyServer.setConnectors(new Connector[] { jettyConnector });
+      org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool();
       pool.setMaxThreads(THREADS);
       jettyServer.setThreadPool(pool);
-        //and add the servlet to it
-      Context root = new Context(jettyServer,"/",Context.SESSIONS);
+      // and add the servlet to it
+      Context root = new Context(jettyServer, "/", Context.SESSIONS);
       root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
       jettyServer.start();
       jettyServer.setStopAtShutdown(false);
-     
+
       System.out.println("started http collector on port number " + portNum);
 
-    } catch(Exception e) {
-     e.printStackTrace();
+    } catch (Exception e) {
+      e.printStackTrace();
       System.exit(0);
     }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Wed Mar 11 22:39:26 2009
@@ -18,219 +18,205 @@
 
 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
 
+
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.*;
-
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.ServletOutputStream;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.chukwa.datacollection.writer.*;
 import org.apache.log4j.Logger;
 
-public class ServletCollector extends HttpServlet
-{
+public class ServletCollector extends HttpServlet {
 
   static final boolean FANCY_DIAGNOSTICS = false;
-	static org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
-	 
+  static org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
+
   private static final long serialVersionUID = 6286162898591407111L;
-  Logger log = Logger.getRootLogger();//.getLogger(ServletCollector.class);
-  
-	public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws WriterException
-	{
-	  writer = w;
-	}
-	static long statTime = 0L;
-	static int numberHTTPConnection = 0;
-	static int numberchunks = 0;
-	static long lifetimechunks=0;
-	
-	Configuration conf;
-  
+  Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class);
+
+  public static void setWriter(
+      org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w)
+      throws WriterException {
+    writer = w;
+  }
+
+  static long statTime = 0L;
+  static int numberHTTPConnection = 0;
+  static int numberchunks = 0;
+  static long lifetimechunks = 0;
+
+  Configuration conf;
+
   public ServletCollector(Configuration c) {
-    conf =c;
+    conf = c;
   }
 
-	
-	public void init(ServletConfig servletConf) throws ServletException
-	{
-	  
-	  log.info("initing servletCollector");
-		if(servletConf == null)	{
-			log.fatal("no servlet config");
-			return;
-		}
-		
-		Timer statTimer = new Timer();
-		statTimer.schedule(new TimerTask()
-		{
-			public void run() 
-			{
-				log.info("stats:ServletCollector,numberHTTPConnection:" + numberHTTPConnection
-						 + ",numberchunks:"+numberchunks);
-				statTime = System.currentTimeMillis();
-				numberHTTPConnection = 0;
-				numberchunks = 0;
-			}
-		}, (1000), (60*1000));
-		
-		if(writer != null) {
-		  log.info("writer set up statically, no need for Collector.init() to do it");
-		  return;
-		}
-		
-		try {
-	   String writerClassName = conf.get("chukwaCollector.writerClass", 
-	          SeqFileWriter.class.getCanonicalName());
-	    Class<?> writerClass = Class.forName(writerClassName);
-	    if(writerClass != null &&ChukwaWriter.class.isAssignableFrom(writerClass))
-	        writer = (ChukwaWriter) writerClass.newInstance();
-		} catch(Exception e) {
-		  log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
-		}
-      
-    //We default to here if the pipeline construction failed or didn't happen.
-    try{ 
-      if(writer == null)
-        writer =  new SeqFileWriter();//default to SeqFileWriter
+  public void init(ServletConfig servletConf) throws ServletException {
+
+    log.info("initing servletCollector");
+    if (servletConf == null) {
+      log.fatal("no servlet config");
+      return;
+    }
+
+    Timer statTimer = new Timer();
+    statTimer.schedule(new TimerTask() {
+      public void run() {
+        log.info("stats:ServletCollector,numberHTTPConnection:"
+            + numberHTTPConnection + ",numberchunks:" + numberchunks);
+        statTime = System.currentTimeMillis();
+        numberHTTPConnection = 0;
+        numberchunks = 0;
+      }
+    }, (1000), (60 * 1000));
+
+    if (writer != null) {
+      log
+          .info("writer set up statically, no need for Collector.init() to do it");
+      return;
+    }
+
+    try {
+      String writerClassName = conf.get("chukwaCollector.writerClass",
+          SeqFileWriter.class.getCanonicalName());
+      Class<?> writerClass = Class.forName(writerClassName);
+      if (writerClass != null
+          && ChukwaWriter.class.isAssignableFrom(writerClass))
+        writer = (ChukwaWriter) writerClass.newInstance();
+    } catch (Exception e) {
+      log
+          .warn(
+              "failed to use user-chosen writer class, defaulting to SeqFileWriter",
+              e);
+    }
+
+    // We default to here if the pipeline construction failed or didn't happen.
+    try {
+      if (writer == null)
+        writer = new SeqFileWriter();// default to SeqFileWriter
       writer.init(conf);
-		} catch (WriterException e) {
-			throw new ServletException("Problem init-ing servlet", e);
-		}		
-	}
-
-	protected void accept(HttpServletRequest req, HttpServletResponse resp)
-	throws ServletException
-	{
-		numberHTTPConnection ++;
-		ServletDiagnostics diagnosticPage = new ServletDiagnostics();
-		final long currentTime = System.currentTimeMillis();
-		try {
-
-			log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
-			java.io.InputStream in = req.getInputStream();
-
-			ServletOutputStream l_out = resp.getOutputStream();
-			final DataInputStream di = new DataInputStream(in);
-			final int numEvents = di.readInt();
-			//	log.info("saw " + numEvents+ " in request");
-
-			if(FANCY_DIAGNOSTICS)
-			{ diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime); }
-
-			List<Chunk> events = new LinkedList<Chunk>();
-			ChunkImpl logEvent = null;
-			StringBuilder sb = new StringBuilder();
-
-			for (int i = 0; i < numEvents; i++)
-			{
-				// TODO: pass new data to all registered stream handler 
-			  //       methods for this chunk's stream
-				// TODO: should really have some dynamic assignment of events to writers
-
-				logEvent =  ChunkImpl.read(di);
-				sb.append("ok:");
-				sb.append(logEvent.getData().length);
-				sb.append(" bytes ending at offset ");
-				sb.append(logEvent.getSeqID()-1).append("\n");
-
-				events.add(logEvent);
-
-				if(FANCY_DIAGNOSTICS)
-				{ diagnosticPage.sawChunk(logEvent, i); }
-			}
-
-			// write new data to data sync file
-			if(writer != null) 
-			{
-				writer.add(events);
-				numberchunks += events.size();
-				lifetimechunks += events.size();
-				//this is where we ACK this connection
-				l_out.print(sb.toString());
-			}
-			else
-			{
-				l_out.println("can't write: no writer");
-			}
-
-
-			if(FANCY_DIAGNOSTICS)
-			{ diagnosticPage.doneWithPost(); }
-
-			resp.setStatus(200);
-
-		} 
-		catch(Throwable e) 
-		{
-			log.warn("Exception talking to " +req.getRemoteHost() + " at " + currentTime , e);
-			throw new ServletException(e);
-		}
-	}
-
-	
-	@Override
-	protected void doPost(HttpServletRequest req, HttpServletResponse resp)
-			throws ServletException, IOException
-	{
-		accept(req,resp);
-	}
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp)
-			throws ServletException, IOException
-	{
-	
-		PrintStream out = new PrintStream(resp.getOutputStream());
-		resp.setStatus(200);
-		
-	  String pingAtt = req.getParameter("ping");
-	  if (pingAtt!=null)
-	  {
-		  out.println("Date:" + ServletCollector.statTime);
-		  out.println("Now:" + System.currentTimeMillis());
-		  out.println("numberHTTPConnection:" + ServletCollector.numberHTTPConnection);
-		  out.println("numberchunks:" + ServletCollector.numberchunks);
+    } catch (WriterException e) {
+      throw new ServletException("Problem init-ing servlet", e);
+    }
+  }
+
+  protected void accept(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException {
+    numberHTTPConnection++;
+    ServletDiagnostics diagnosticPage = new ServletDiagnostics();
+    final long currentTime = System.currentTimeMillis();
+    try {
+
+      log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
+      java.io.InputStream in = req.getInputStream();
+
+      ServletOutputStream l_out = resp.getOutputStream();
+      final DataInputStream di = new DataInputStream(in);
+      final int numEvents = di.readInt();
+      // log.info("saw " + numEvents+ " in request");
+
+      if (FANCY_DIAGNOSTICS) {
+        diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
+      }
+
+      List<Chunk> events = new LinkedList<Chunk>();
+      ChunkImpl logEvent = null;
+      StringBuilder sb = new StringBuilder();
+
+      for (int i = 0; i < numEvents; i++) {
+        // TODO: pass new data to all registered stream handler
+        // methods for this chunk's stream
+        // TODO: should really have some dynamic assignment of events to writers
+
+        logEvent = ChunkImpl.read(di);
+        sb.append("ok:");
+        sb.append(logEvent.getData().length);
+        sb.append(" bytes ending at offset ");
+        sb.append(logEvent.getSeqID() - 1).append("\n");
+
+        events.add(logEvent);
+
+        if (FANCY_DIAGNOSTICS) {
+          diagnosticPage.sawChunk(logEvent, i);
+        }
+      }
+
+      // write new data to data sync file
+      if (writer != null) {
+        writer.add(events);
+        numberchunks += events.size();
+        lifetimechunks += events.size();
+        // this is where we ACK this connection
+        l_out.print(sb.toString());
+      } else {
+        l_out.println("can't write: no writer");
+      }
+
+      if (FANCY_DIAGNOSTICS) {
+        diagnosticPage.doneWithPost();
+      }
+
+      resp.setStatus(200);
+
+    } catch (Throwable e) {
+      log.warn("Exception talking to " + req.getRemoteHost() + " at "
+          + currentTime, e);
+      throw new ServletException(e);
+    }
+  }
+
+  @Override
+  protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+    accept(req, resp);
+  }
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+      throws ServletException, IOException {
+
+    PrintStream out = new PrintStream(resp.getOutputStream());
+    resp.setStatus(200);
+
+    String pingAtt = req.getParameter("ping");
+    if (pingAtt != null) {
+      out.println("Date:" + ServletCollector.statTime);
+      out.println("Now:" + System.currentTimeMillis());
+      out.println("numberHTTPConnection:"
+          + ServletCollector.numberHTTPConnection);
+      out.println("numberchunks:" + ServletCollector.numberchunks);
       out.println("lifetimechunks:" + ServletCollector.lifetimechunks);
-	  }
-	  else
-	  {
-		  out.println("<html><body><h2>Chukwa servlet running</h2>");
-		  if(FANCY_DIAGNOSTICS)
-		    ServletDiagnostics.printPage(out);
-		  out.println("</body></html>");
-	  }
-    
-	  
-	}
-
-    @Override	
-	public String getServletInfo()
-	{
-		return "Chukwa Servlet Collector";
-	}
-
-	@Override
-	public void destroy()
-	{
-	  try
-	{
-		writer.close();
-	} catch (WriterException e)
-	{
-		log.warn("Exception during close", e);
-		e.printStackTrace();
-	}
-	  super.destroy();
-	}
+    } else {
+      out.println("<html><body><h2>Chukwa servlet running</h2>");
+      if (FANCY_DIAGNOSTICS)
+        ServletDiagnostics.printPage(out);
+      out.println("</body></html>");
+    }
+
+  }
+
+  @Override
+  public String getServletInfo() {
+    return "Chukwa Servlet Collector";
+  }
+
+  @Override
+  public void destroy() {
+    try {
+      writer.close();
+    } catch (WriterException e) {
+      log.warn("Exception during close", e);
+      e.printStackTrace();
+    }
+    super.destroy();
+  }
 }