You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC

svn commit: r752666 [4/16] - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/ src/java/org/apache/hadoop/chukwa/database/ src/java/org/apache/hadoop/chukwa/datacollection/ src/java/org/apache/hadoop...

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java Wed Mar 11 22:39:26 2009
@@ -17,11 +17,10 @@
  */
 package org.apache.hadoop.chukwa.datacollection.collector.servlet;
 
-import java.io.PrintStream;
 
+import java.io.PrintStream;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.log4j.Logger;
-
 import java.util.*;
 
 /**
@@ -29,42 +28,41 @@
  */
 public class ServletDiagnostics {
 
-  static Logger log=  Logger.getLogger(ServletDiagnostics.class);
-  
+  static Logger log = Logger.getLogger(ServletDiagnostics.class);
 
   static int CHUNKS_TO_KEEP = 50;
   static int CHUNKS_TO_DISPLAY = 50;
-  
-  private static class PostStats { //statistics about a chunk
-    public PostStats(String src, int count, long receivedTs)
-    {
+
+  private static class PostStats { // statistics about a chunk
+    public PostStats(String src, int count, long receivedTs) {
       this.count = count;
       this.src = src;
       this.receivedTs = receivedTs;
       types = new String[count];
       names = new String[count];
       lengths = new int[count];
-      
+
       seenChunkCount = 0;
       dataSize = 0;
     }
+
     final int count;
     final String src;
     final long receivedTs;
     final String[] types, names;
     final int[] lengths;
-    
+
     int seenChunkCount;
     long dataSize;
-    public void addChunk(ChunkImpl c, int position)
-    {
-      if(position != seenChunkCount)
-        log.warn("servlet collector is passing chunk " + position + " but diagnostics has seen" +
-            seenChunkCount);
-      else if(seenChunkCount >= count){
-        log.warn("too many chunks in post declared as length " +count);
+
+    public void addChunk(ChunkImpl c, int position) {
+      if (position != seenChunkCount)
+        log.warn("servlet collector is passing chunk " + position
+            + " but diagnostics has seen" + seenChunkCount);
+      else if (seenChunkCount >= count) {
+        log.warn("too many chunks in post declared as length " + count);
       } else {
-        types[seenChunkCount] = c.getDataType(); 
+        types[seenChunkCount] = c.getDataType();
         lengths[seenChunkCount] = c.getData().length;
         names[seenChunkCount] = c.getStreamName();
         dataSize += c.getData().length;
@@ -72,7 +70,7 @@
       }
     }
   }
-  
+
   static {
     lastPosts = new LinkedList<PostStats>();
   }
@@ -80,74 +78,77 @@
   static LinkedList<PostStats> lastPosts;
   PostStats curPost;
 
-  
   public void sawPost(String source, int chunks, long receivedTs) {
-    if(curPost != null) {
+    if (curPost != null) {
       log.warn("should only have one HTTP post per ServletDiagnostics");
       doneWithPost();
     }
     curPost = new PostStats(source, chunks, receivedTs);
   }
-  
+
   public void sawChunk(ChunkImpl c, int pos) {
     curPost.addChunk(c, pos);
   }
 
   public static void printPage(PrintStream out) {
-    
-    HashMap<String, Long> bytesFromHost = new HashMap<String, Long>();    
+
+    HashMap<String, Long> bytesFromHost = new HashMap<String, Long>();
     long timeWindowOfSample = Long.MAX_VALUE;
     long now = System.currentTimeMillis();
 
     out.println("<ul>");
-    
-    synchronized(lastPosts) {
-      int toSkip = lastPosts.size() - CHUNKS_TO_DISPLAY; 
-      
-      if(!lastPosts.isEmpty())
-        timeWindowOfSample = now -  lastPosts.peek().receivedTs;
-      
-      for(PostStats stats: lastPosts) {
+
+    synchronized (lastPosts) {
+      int toSkip = lastPosts.size() - CHUNKS_TO_DISPLAY;
+
+      if (!lastPosts.isEmpty())
+        timeWindowOfSample = now - lastPosts.peek().receivedTs;
+
+      for (PostStats stats : lastPosts) {
         Long oldBytes = bytesFromHost.get(stats.src);
         long newBytes = stats.dataSize;
-        if(oldBytes != null)
+        if (oldBytes != null)
           newBytes += oldBytes;
         bytesFromHost.put(stats.src, newBytes);
-        
-        if( -- toSkip < 0) { //done skipping
+
+        if (--toSkip < 0) { // done skipping
           out.print("<li>");
-          
-          out.print(stats.dataSize + " bytes from " + stats.src + " at timestamp " + stats.receivedTs);
-          out.println(" which was " +  ((now - stats.receivedTs)/ 1000) + " seconds ago");
-  
+
+          out.print(stats.dataSize + " bytes from " + stats.src
+              + " at timestamp " + stats.receivedTs);
+          out.println(" which was " + ((now - stats.receivedTs) / 1000)
+              + " seconds ago");
+
           out.println("<ol>");
-          for(int i =0; i < stats.count; ++i)
-            out.println("<li> "+ stats.lengths[i] + " bytes of type " +
-                stats.types[i] + ".  Adaptor name ="+ stats.names[i] +" </li>");
+          for (int i = 0; i < stats.count; ++i)
+            out.println("<li> " + stats.lengths[i] + " bytes of type "
+                + stats.types[i] + ".  Adaptor name =" + stats.names[i]
+                + " </li>");
           out.println("</ol></li>");
         }
       }
     }
     out.println("</ul>");
     out.println("<ul>");
-    for(Map.Entry<String, Long> h: bytesFromHost.entrySet()) {
-      out.print("<li>rate from " + h.getKey() + " was " + (1000 * h.getValue() / timeWindowOfSample));
-      out.println(" bytes/second in last " + timeWindowOfSample/1000 + " seconds.</li>");
+    for (Map.Entry<String, Long> h : bytesFromHost.entrySet()) {
+      out.print("<li>rate from " + h.getKey() + " was "
+          + (1000 * h.getValue() / timeWindowOfSample));
+      out.println(" bytes/second in last " + timeWindowOfSample / 1000
+          + " seconds.</li>");
     }
-    
 
-    out.println("</ul>");    
+    out.println("</ul>");
     out.println("total of " + bytesFromHost.size() + " unique hosts seen");
 
     out.println("<p>current time is " + System.currentTimeMillis() + " </p>");
   }
 
   public void doneWithPost() {
-    synchronized(lastPosts) {
-      if(lastPosts.size() > CHUNKS_TO_KEEP)
+    synchronized (lastPosts) {
+      if (lastPosts.size() > CHUNKS_TO_KEEP)
         lastPosts.removeFirst();
       lastPosts.add(curPost);
     }
   }
-  
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,31 @@
 
 package org.apache.hadoop.chukwa.datacollection.connector;
 
+
 /**
- * This class is responsible for setting up a long living process that repeatedly calls the 
- * <code>send</code> function of a Sender.
+ * This class is responsible for setting up a long living process that
+ * repeatedly calls the <code>send</code> function of a Sender.
  */
 
-public interface Connector
-{
-	static final int proxyTimestampField = 0;
-	/**
+public interface Connector {
+  static final int proxyTimestampField = 0;
+  /**
 	 * 
 	 */
-	static final int proxyURIField = 1;
-	static final int proxyRetryField = 2;
-	
-	static final int adaptorTimestampField = 3;
-	static final int adaptorURIField = 4;
-
-	static final int logTimestampField = 5;
-	static final int logSourceField = 6;
-	static final int logApplicationField = 7;
-	static final int logEventField = 8;
-
-	
-	public void start();
-    public void shutdown();
-    public void reloadConfiguration();
+  static final int proxyURIField = 1;
+  static final int proxyRetryField = 2;
+
+  static final int adaptorTimestampField = 3;
+  static final int adaptorURIField = 4;
+
+  static final int logTimestampField = 5;
+  static final int logSourceField = 6;
+  static final int logApplicationField = 7;
+  static final int logEventField = 8;
+
+  public void start();
+
+  public void shutdown();
+
+  public void reloadConfiguration();
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.connector.http;
 
+
 /**
  * This class is responsible for setting up a {@link HttpConnectorClient} with a  collectors
  * and then repeatedly calling its send function which encapsulates the work of setting up the
@@ -31,7 +32,7 @@
  * On error, tries the list of available collectors, pauses for a minute, and then repeats.
  * </p>
  * <p> Will wait forever for collectors to come up. </p>
- 
+
  */
 
 import java.io.IOException;
@@ -40,7 +41,6 @@
 import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
@@ -49,155 +49,154 @@
 import org.apache.hadoop.chukwa.datacollection.sender.*;
 import org.apache.log4j.Logger;
 
+public class HttpConnector implements Connector, Runnable {
 
-public class HttpConnector implements Connector, Runnable  {
-  
-	static Logger log = Logger.getLogger(HttpConnector.class);
+  static Logger log = Logger.getLogger(HttpConnector.class);
 
   static Timer statTimer = null;
   static volatile int chunkCount = 0;
-  static final int MAX_SIZE_PER_POST = 2*1024*1024;
-  static final int MIN_POST_INTERVAL= 5 * 1000;
+  static final int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
+  static final int MIN_POST_INTERVAL = 5 * 1000;
   static ChunkQueue chunkQueue;
-  
+
   ChukwaAgent agent;
   String argDestination = null;
-  
+
   private volatile boolean stopMe = false;
   private boolean reloadConfiguration = false;
   private Iterator<String> collectors = null;
   protected ChukwaSender connectorClient = null;
-  
-  static{
+
+  static {
     statTimer = new Timer();
     chunkQueue = DataFactory.getInstance().getEventQueue();
     statTimer.schedule(new TimerTask() {
       public void run() {
         int count = chunkCount;
-        chunkCount = 0;           
-        log.info("# http chunks ACK'ed since last report: " + count );
+        chunkCount = 0;
+        log.info("# http chunks ACK'ed since last report: " + count);
       }
-    }, 100,60*1000);
+    }, 100, 60 * 1000);
+  }
+
+  public HttpConnector(ChukwaAgent agent) {
+    this.agent = agent;
+  }
+
+  public HttpConnector(ChukwaAgent agent, String destination) {
+    this.agent = agent;
+    this.argDestination = destination;
+
+    log.info("Setting HTTP Connector URL manually using arg passed to Agent: "
+        + destination);
+  }
+
+  public void start() {
+    (new Thread(this, "HTTP post thread")).start();
+  }
+
+  public void shutdown() {
+    stopMe = true;
+  }
+
+  public void run() {
+    log.info("HttpConnector started at time:" + System.currentTimeMillis());
+
+    Iterator<String> destinations = null;
+
+    // build a list of our destinations from collectors
+    try {
+      destinations = DataFactory.getInstance().getCollectorURLs();
+    } catch (IOException e) {
+      log.error("Failed to retreive list of collectors from "
+          + "conf/collectors file", e);
+    }
+
+    connectorClient = new ChukwaHttpSender(agent.getConfiguration());
+
+    if (argDestination != null) {
+      ArrayList<String> tmp = new ArrayList<String>();
+      tmp.add(argDestination);
+      collectors = tmp.iterator();
+      connectorClient.setCollectors(collectors);
+      log.info("using collector specified at agent runtime: " + argDestination);
+    } else if (destinations != null && destinations.hasNext()) {
+      collectors = destinations;
+      connectorClient.setCollectors(destinations);
+      log.info("using collectors from collectors file");
+    } else {
+      log.error("No collectors specified, exiting (and taking agent with us).");
+      agent.shutdown(true);// error is unrecoverable, so stop hard.
+      return;
+    }
+
+    try {
+      long lastPost = System.currentTimeMillis();
+      while (!stopMe) {
+        List<Chunk> newQueue = new ArrayList<Chunk>();
+        try {
+          // get all ready chunks from the chunkQueue to be sent
+          chunkQueue.collect(newQueue, MAX_SIZE_PER_POST); // FIXME: should
+                                                           // really do this by
+                                                           // size
+
+        } catch (InterruptedException e) {
+          System.out.println("thread interrupted during addChunks(ChunkQueue)");
+          Thread.currentThread().interrupt();
+          break;
+        }
+        int toSend = newQueue.size();
+        List<ChukwaHttpSender.CommitListEntry> results = connectorClient
+            .send(newQueue);
+        log.info("sent " + toSend + " chunks, got back " + results.size()
+            + " acks");
+        // checkpoint the chunks which were committed
+        for (ChukwaHttpSender.CommitListEntry cle : results) {
+          agent.reportCommit(cle.adaptor, cle.uuid);
+          chunkCount++;
+        }
+
+        if (reloadConfiguration) {
+          connectorClient.setCollectors(collectors);
+          log.info("Resetting colectors");
+          reloadConfiguration = false;
+        }
+
+        long now = System.currentTimeMillis();
+        if (now - lastPost < MIN_POST_INTERVAL)
+          Thread.sleep(now - lastPost); // wait for stuff to accumulate
+        lastPost = now;
+      } // end of try forever loop
+      log
+          .info("received stop() command so exiting run() loop to shutdown connector");
+    } catch (OutOfMemoryError e) {
+      log.warn("Bailing out", e);
+      System.exit(-1);
+    } catch (InterruptedException e) {
+      // do nothing, let thread die.
+      log.warn("Bailing out", e);
+      System.exit(-1);
+    } catch (java.io.IOException e) {
+      log.error("connector failed; shutting down agent");
+      agent.shutdown(true);
+    }
+  }
+
+  @Override
+  public void reloadConfiguration() {
+    reloadConfiguration = true;
+    Iterator<String> destinations = null;
+
+    // build a list of our destinations from collectors
+    try {
+      destinations = DataFactory.getInstance().getCollectorURLs();
+    } catch (IOException e) {
+      log.error(
+          "Failed to retreive list of collectors from conf/collectors file", e);
+    }
+    if (destinations != null && destinations.hasNext()) {
+      collectors = destinations;
+    }
+
   }
-  
-	public HttpConnector(ChukwaAgent agent)	{
-		this.agent = agent;
-	}
-
-	 public HttpConnector(ChukwaAgent agent, String destination) {
-	    this.agent = agent;
-	    this.argDestination = destination;
-
-      log.info("Setting HTTP Connector URL manually using arg passed to Agent: " + destination);
-	  }
-	
-	public void start() 	{
-		(new Thread(this, "HTTP post thread")).start();
-	}
-	
-	public void shutdown(){
-	  stopMe = true;
-	}
-	
-	public void run(){
-		log.info("HttpConnector started at time:" + System.currentTimeMillis());
-
-		Iterator<String> destinations = null;
-
-		// build a list of our destinations from collectors
-		try{
-			destinations = DataFactory.getInstance().getCollectorURLs();
-		} catch (IOException e){
-			log.error("Failed to retreive list of collectors from " +
-					"conf/collectors file", e);
-		}
-
-		connectorClient = new ChukwaHttpSender(agent.getConfiguration());
-
-		if (argDestination != null) 
-		{
-			ArrayList<String> tmp = new ArrayList<String>();
-			tmp.add(argDestination);
-			collectors = tmp.iterator();
-			connectorClient.setCollectors(collectors);
-			log.info("using collector specified at agent runtime: " + argDestination);
-		} 
-		else if (destinations != null && destinations.hasNext()) 
-		{
-			collectors = destinations;
-			connectorClient.setCollectors(destinations);
-			log.info("using collectors from collectors file");
-		} 
-		else {
-			log.error("No collectors specified, exiting (and taking agent with us).");
-			agent.shutdown(true);//error is unrecoverable, so stop hard.
-			return;
-		}
-
-		try {
-			long lastPost = System.currentTimeMillis();
-			while(!stopMe) {
-				List<Chunk> newQueue = new ArrayList<Chunk>();
-				try {
-					//get all ready chunks from the chunkQueue to be sent
-					chunkQueue.collect(newQueue,MAX_SIZE_PER_POST); //FIXME: should really do this by size
-
-				} catch(InterruptedException e) {
-					System.out.println("thread interrupted during addChunks(ChunkQueue)");
-					Thread.currentThread().interrupt();
-					break;
-				}
-				int toSend = newQueue.size();
-				List<ChukwaHttpSender.CommitListEntry> results = connectorClient.send(newQueue);
-				log.info("sent " +toSend + " chunks, got back " + results.size() + " acks");
-				//checkpoint the chunks which were committed
-				for(ChukwaHttpSender.CommitListEntry cle : results) {
-					agent.reportCommit(cle.adaptor, cle.uuid);
-					chunkCount++;
-				}
-
-				if (reloadConfiguration)
-				{
-					connectorClient.setCollectors(collectors);
-					log.info("Resetting colectors");
-					reloadConfiguration = false;
-				}
-
-				long now = System.currentTimeMillis();
-				if( now - lastPost < MIN_POST_INTERVAL )  
-					Thread.sleep(now - lastPost);  //wait for stuff to accumulate
-				lastPost = now;
-			} //end of try forever loop
-			log.info("received stop() command so exiting run() loop to shutdown connector");
-		} catch(OutOfMemoryError e) {
-			log.warn("Bailing out",e);
-			System.exit(-1);
-		} catch(InterruptedException e) {
-			//do nothing, let thread die.
-			log.warn("Bailing out",e);
-			System.exit(-1);
-		}catch(java.io.IOException e) {
-			log.error("connector failed; shutting down agent");
-			agent.shutdown(true);
-		}
-	}
-
-	@Override
-	public void reloadConfiguration()
-	{
-		reloadConfiguration = true;
-		Iterator<String> destinations = null;
-		  
-	 	// build a list of our destinations from collectors
-	 	try{
-	    destinations = DataFactory.getInstance().getCollectorURLs();
-	  } catch (IOException e){
-	    log.error("Failed to retreive list of collectors from conf/collectors file", e);
-	  }
-	  if (destinations != null && destinations.hasNext()) 
-	  {
-		  collectors = destinations;
-	  }
-    
-	}
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java Wed Mar 11 22:39:26 2009
@@ -34,29 +34,29 @@
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
-
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.log4j.Logger;
 
 /**
- * A convenience library for applications to communicate to the {@link ChukwaAgent}. Can be used
- * to register and unregister new {@link Adaptor}s. Also contains functions for applications to
- * use for handling log rations.  
+ * A convenience library for applications to communicate to the
+ * {@link ChukwaAgent}. Can be used to register and unregister new
+ * {@link Adaptor}s. Also contains functions for applications to use for
+ * handling log rations.
  */
 public class ChukwaAgentController {
-	static Logger log = Logger.getLogger(ChukwaAgentController.class);
-  public class AddAdaptorTask extends TimerTask 
-  {
-	  	
-	    String adaptorName;
-	    String type;
-	    String params;
-	    long offset;
-	    long numRetries;
-	    long retryInterval;
-    
-    AddAdaptorTask(String adaptorName, String type, String params,
-        long offset, long numRetries, long retryInterval){
+  static Logger log = Logger.getLogger(ChukwaAgentController.class);
+
+  public class AddAdaptorTask extends TimerTask {
+
+    String adaptorName;
+    String type;
+    String params;
+    long offset;
+    long numRetries;
+    long retryInterval;
+
+    AddAdaptorTask(String adaptorName, String type, String params, long offset,
+                   long numRetries, long retryInterval) {
       this.adaptorName = adaptorName;
       this.type = type;
       this.params = params;
@@ -64,354 +64,398 @@
       this.numRetries = numRetries;
       this.retryInterval = retryInterval;
     }
+
     @Override
-    public void run() 
-    {
-    	try
-    	{
-    		log.info("Trying to resend the add command [" + adaptorName + "][" + offset + "][" + params +"] [" + numRetries+"]");
-    		add(adaptorName, type, params, offset, numRetries, retryInterval);
-    	}
-    	catch(Exception e)
-    	{
-    		log.warn("Exception in AddAdaptorTask.run", e);
-    		e.printStackTrace();
-    	} 
+    public void run() {
+      try {
+        log.info("Trying to resend the add command [" + adaptorName + "]["
+            + offset + "][" + params + "] [" + numRetries + "]");
+        add(adaptorName, type, params, offset, numRetries, retryInterval);
+      } catch (Exception e) {
+        log.warn("Exception in AddAdaptorTask.run", e);
+        e.printStackTrace();
+      }
     }
   }
 
-  //our default adaptors, provided here for convenience
+  // our default adaptors, provided here for convenience
   public static final String CharFileTailUTF8 = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8";
   public static final String CharFileTailUTF8NewLineEscaped = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
-  
-    
+
   static String DEFAULT_FILE_TAILER = CharFileTailUTF8NewLineEscaped;
   static int DEFAULT_PORT = 9093;
   static String DEFAULT_HOST = "localhost";
   static int numArgs = 0;
-  
-  class Adaptor{
-    public long id = -1;    
+
+  class Adaptor {
+    public long id = -1;
     final public String name;
     final public String params;
     final public String appType;
-    public long offset; 
+    public long offset;
 
-    
-    Adaptor(String adaptorName, String appType, String params, long offset){
+    Adaptor(String adaptorName, String appType, String params, long offset) {
       this.name = adaptorName;
       this.appType = appType;
       this.params = params;
       this.offset = offset;
     }
-    
-    Adaptor(long id, String adaptorName, String appType, String params, long offset){
+
+    Adaptor(long id, String adaptorName, String appType, String params,
+            long offset) {
       this.id = id;
       this.name = adaptorName;
       this.appType = appType;
       this.params = params;
       this.offset = offset;
     }
-    
+
     /**
-     * Registers this {@link Adaptor} with the agent running at the specified hostname and portno  
-     * @return The id number of the this {@link Adaptor}, assigned by the agent upon successful registration
+     * Registers this {@link Adaptor} with the agent running at the specified
+     * hostname and portno
+     * 
+     * @return The id number of the this {@link Adaptor}, assigned by the agent
+     *         upon successful registration
      * @throws IOException
      */
-    long register() throws IOException{
+    long register() throws IOException {
       Socket s = new Socket(hostname, portno);
-      PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+      PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
+          .getOutputStream()));
       bw.println("ADD " + name + " " + appType + " " + params + " " + offset);
       bw.flush();
-      BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+      BufferedReader br = new BufferedReader(new InputStreamReader(s
+          .getInputStream()));
       String resp = br.readLine();
-      if(resp != null){
+      if (resp != null) {
         String[] fields = resp.split(" ");
-        if(fields[0].equals("OK")){
-          try{
-            id = Long.parseLong(fields[fields.length -1]);
+        if (fields[0].equals("OK")) {
+          try {
+            id = Long.parseLong(fields[fields.length - 1]);
+          } catch (NumberFormatException e) {
           }
-          catch (NumberFormatException e){}
         }
       }
       s.close();
       return id;
     }
-    
-    void unregister() throws IOException{
+
+    void unregister() throws IOException {
       Socket s = new Socket(hostname, portno);
-      PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+      PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
+          .getOutputStream()));
       bw.println("SHUTDOWN " + id);
       bw.flush();
 
-      BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+      BufferedReader br = new BufferedReader(new InputStreamReader(s
+          .getInputStream()));
       String resp = br.readLine();
-      if( resp == null || !resp.startsWith("OK"))
-      {
-        //error.  What do we do?
-      } else if (resp.startsWith("OK")){
+      if (resp == null || !resp.startsWith("OK")) {
+        // error. What do we do?
+      } else if (resp.startsWith("OK")) {
         String[] respSplit = resp.split(" ");
-        String newOffset = respSplit[respSplit.length-1];
+        String newOffset = respSplit[respSplit.length - 1];
         try {
           offset = Long.parseLong(newOffset);
-        }catch (NumberFormatException nfe){
+        } catch (NumberFormatException nfe) {
           System.err.println("adaptor didn't shutdown gracefully.\n" + nfe);
         }
       }
-      
+
       s.close();
     }
-    
-    public String toString(){
+
+    public String toString() {
       String[] namePieces = name.split("\\.");
-      String shortName = namePieces[namePieces.length-1];
-      return id + " " + shortName + " " + appType + " " + params + " " + offset; 
+      String shortName = namePieces[namePieces.length - 1];
+      return id + " " + shortName + " " + appType + " " + params + " " + offset;
     }
   }
-  
-  Map<Long, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<Long,Adaptor>();
+
+  Map<Long, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<Long, Adaptor>();
   Map<Long, ChukwaAgentController.Adaptor> pausedAdaptors;
   String hostname;
   int portno;
-  
-  
-  public ChukwaAgentController(){
+
+  public ChukwaAgentController() {
     portno = DEFAULT_PORT;
     hostname = DEFAULT_HOST;
-    pausedAdaptors = new HashMap<Long,Adaptor>();
-    
+    pausedAdaptors = new HashMap<Long, Adaptor>();
+
     syncWithAgent();
   }
-  
-  public ChukwaAgentController(String hostname, int portno)
-  {
+
+  public ChukwaAgentController(String hostname, int portno) {
     this.hostname = hostname;
     this.portno = portno;
-    pausedAdaptors = new HashMap<Long,Adaptor>();
+    pausedAdaptors = new HashMap<Long, Adaptor>();
 
     syncWithAgent();
   }
 
   private boolean syncWithAgent() {
-    //set up adaptors by using list here
-    try{
+    // set up adaptors by using list here
+    try {
       runningAdaptors = list();
       return true;
-    }catch(IOException e){
-      System.err.println("Error initializing ChukwaClient with list of " +
-          "currently registered adaptors, clearing our local list of adaptors");
-      //e.printStackTrace();
-      //if we can't connect to the LocalAgent, reset/clear our local view of the Adaptors.
-      runningAdaptors = new HashMap<Long,ChukwaAgentController.Adaptor>();
+    } catch (IOException e) {
+      System.err
+          .println("Error initializing ChukwaClient with list of "
+              + "currently registered adaptors, clearing our local list of adaptors");
+      // e.printStackTrace();
+      // if we can't connect to the LocalAgent, reset/clear our local view of
+      // the Adaptors.
+      runningAdaptors = new HashMap<Long, ChukwaAgentController.Adaptor>();
       return false;
     }
   }
-  
+
   /**
-   * Registers a new adaptor. Makes no guarantee about success. On failure,
-   * we print a message to stderr and ignore silently so that an application
+   * Registers a new adaptor. Makes no guarantee about success. On failure, we
+   * print a message to stderr and ignore silently so that an application
    * doesn't crash if it's attempt to register an adaptor fails. This call does
    * not retry a conection. for that use the overloaded version of this which
    * accepts a time interval and number of retries
+   * 
+   * @return the id number of the adaptor, generated by the agent
+   */
+  public long add(String adaptorName, String type, String params, long offset) {
+    return add(adaptorName, type, params, offset, 20, 15 * 1000);// retry for
+                                                                 // five
+                                                                 // minutes,
+                                                                 // every
+                                                                 // fifteen
+                                                                 // seconds
+  }
+
+  /**
+   * Registers a new adaptor. Makes no guarantee about success. On failure, to
+   * connect to server, will retry <code>numRetries</code> times, every
+   * <code>retryInterval</code> milliseconds.
+   * 
+   * @return the id number of the adaptor, generated by the agent
+   */
+  public long add(String adaptorName, String type, String params, long offset,
+      long numRetries, long retryInterval) {
+    ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(
+        adaptorName, type, params, offset);
+    long adaptorID = -1;
+    if (numRetries >= 0) {
+      try {
+        adaptorID = adaptor.register();
+
+        if (adaptorID > 0) {
+          runningAdaptors.put(adaptorID, adaptor);
+        } else {
+          System.err
+              .println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
+        }
+      } catch (IOException ioe) {
+        System.out.println("AgentClient failed to contact the agent ("
+            + hostname + ":" + portno + ")");
+        System.out
+            .println("Scheduling a agent connection retry for adaptor add() in another "
+                + retryInterval
+                + " milliseconds, "
+                + numRetries
+                + " retries remaining");
+
+        Timer addFileTimer = new Timer();
+        addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params,
+            offset, numRetries - 1, retryInterval), retryInterval);
+      }
+    } else {
+      System.err.println("Giving up on connecting to the local agent");
+    }
+    return adaptorID;
+  }
+
+  public synchronized ChukwaAgentController.Adaptor remove(long adaptorID)
+      throws IOException {
+    syncWithAgent();
+    ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
+    a.unregister();
+    return a;
+
+  }
+
+  public void remove(String className, String appType, String filename)
+      throws IOException {
+    syncWithAgent();
+    // search for FileTail adaptor with string of this file name
+    // get its id, tell it to unregister itself with the agent,
+    // then remove it from the list of adaptors
+    for (Adaptor a : runningAdaptors.values()) {
+      if (a.name.equals(className) && a.params.equals(filename)
+          && a.appType.equals(appType)) {
+        remove(a.id);
+      }
+    }
+  }
+
+  public void removeAll() {
+    syncWithAgent();
+    Long[] keyset = runningAdaptors.keySet().toArray(new Long[] {});
+
+    for (long id : keyset) {
+      try {
+        remove(id);
+      } catch (IOException ioe) {
+        System.err.println("Error removing an adaptor in removeAll()");
+        ioe.printStackTrace();
+      }
+      System.out.println("Successfully removed adaptor " + id);
+    }
+  }
+
+  Map<Long, ChukwaAgentController.Adaptor> list() throws IOException {
+    Socket s = new Socket(hostname, portno);
+    PrintWriter bw = new PrintWriter(
+        new OutputStreamWriter(s.getOutputStream()));
+
+    bw.println("LIST");
+    bw.flush();
+    BufferedReader br = new BufferedReader(new InputStreamReader(s
+        .getInputStream()));
+    String ln;
+    Map<Long, Adaptor> listResult = new HashMap<Long, Adaptor>();
+    while ((ln = br.readLine()) != null) {
+      if (ln.equals("")) {
+        break;
+      } else {
+        String[] parts = ln.split("\\s+");
+        if (parts.length >= 4) { // should have id, className appType, params,
+                                 // offset
+          long id = Long
+              .parseLong(parts[0].substring(0, parts[0].length() - 1)); // chop
+                                                                        // off
+                                                                        // the
+                                                                        // right
+                                                                        // -
+                                                                        // paren
+          long offset = Long.parseLong(parts[parts.length - 1]);
+          String tmpParams = parts[3];
+          for (int i = 4; i < parts.length - 1; i++) {
+            tmpParams += " " + parts[i];
+          }
+          listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams,
+              offset));
+        }
+      }
+    }
+    s.close();
+    return listResult;
+  }
+
+  // ************************************************************************
+  // The following functions are convenience functions, defining an easy
+  // to use API for application developers to integrate chukwa into their app
+  // ************************************************************************
+
+  /**
+   * Registers a new "LineFileTailUTF8" adaptor and starts it at offset 0.
+   * Checks to see if the file is being watched already, if so, won't register
+   * another adaptor with the agent. If you have run the tail adaptor on this
+   * file before and rotated or emptied the file you should use
+   * {@link ChukwaAgentController#pauseFile(String, String)} and
+   * {@link ChukwaAgentController#resumeFile(String, String)} which will store
+   * the adaptors metadata and re-use them to pick up where it left off.
+   * 
+   * @param type the datatype associated with the file to pass through
+   * @param filename of the file for the tail adaptor to start monitoring
    * @return the id number of the adaptor, generated by the agent
    */
-   public long add(String adaptorName, String type, String params, long offset){
-     return add(adaptorName, type, params, offset, 20, 15* 1000);//retry for five minutes, every fifteen seconds
-   }
-   
-   /**
-    * Registers a new adaptor. Makes no guarantee about success. On failure,
-    * to connect to server, will retry <code>numRetries</code> times, every
-    * <code>retryInterval</code> milliseconds.
-    * @return the id number of the adaptor, generated by the agent
-    */
-   public long add(String adaptorName, String type, String params, long offset, long numRetries, long retryInterval){
-     ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(adaptorName, type, params, offset);
-     long adaptorID = -1;
-     if (numRetries >= 0){
-       try{
-         adaptorID = adaptor.register();
-
-         if (adaptorID > 0){
-           runningAdaptors.put(adaptorID,adaptor);
-         }
-         else{
-           System.err.println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
-         }
-       }catch(IOException ioe){
-         System.out.println("AgentClient failed to contact the agent (" + hostname + ":" + portno + ")");
-         System.out.println("Scheduling a agent connection retry for adaptor add() in another " +
-             retryInterval + " milliseconds, " + numRetries + " retries remaining");
-         
-         Timer addFileTimer = new Timer();
-         addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params, offset, numRetries-1, retryInterval), retryInterval);
-       }
-     }else{
-       System.err.println("Giving up on connecting to the local agent");
-     }
-     return adaptorID;
-   } 
-
-   public synchronized ChukwaAgentController.Adaptor remove(long adaptorID) throws IOException
-   {
-     syncWithAgent();
-     ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
-     a.unregister();
-     return a;
-     
-   }
-   
-   public void remove(String className, String appType, String filename) throws IOException
-   {
-     syncWithAgent();
-     // search for FileTail adaptor with string of this file name
-     // get its id, tell it to unregister itself with the agent,
-     // then remove it from the list of adaptors
-     for (Adaptor a : runningAdaptors.values()){
-       if (a.name.equals(className) && a.params.equals(filename) && a.appType.equals(appType)){
-         remove(a.id);
-       }
-     }
-   }
-   
-   
-   public void removeAll(){
-     syncWithAgent();
-     Long[] keyset = runningAdaptors.keySet().toArray(new Long[]{});
-
-     for (long id : keyset){
-       try {
-         remove(id);
-       }catch(IOException ioe){
-         System.err.println("Error removing an adaptor in removeAll()");
-         ioe.printStackTrace();
-       }
-       System.out.println("Successfully removed adaptor " + id);
-     }
-   }
-   
-   Map<Long,ChukwaAgentController.Adaptor> list() throws IOException
-   {  
-     Socket s = new Socket(hostname, portno);
-     PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
-   
-     bw.println("LIST");
-     bw.flush();
-     BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
-     String ln;
-     Map<Long,Adaptor> listResult = new HashMap<Long,Adaptor>();
-     while((ln = br.readLine())!= null)
-     {
-       if (ln.equals("")){
-         break;
-       }else{
-         String[] parts = ln.split("\\s+");
-         if (parts.length >= 4){ //should have id, className appType, params, offset
-           long id = Long.parseLong(parts[0].substring(0,parts[0].length()-1)); //chop off the right-paren
-           long offset = Long.parseLong(parts[parts.length-1]);
-           String tmpParams = parts[3];
-           for (int i = 4; i<parts.length-1; i++){
-             tmpParams += " " + parts[i];
-           }
-           listResult.put(id, new Adaptor(id,parts[1],parts[2],tmpParams, offset));
-         }
-       }
-     }
-     s.close();
-     return listResult;
-   }
-   
-   //************************************************************************
-   // The following functions are convenience functions, defining an easy
-   // to use API for application developers to integrate chukwa into their app
-   //************************************************************************
-   
-   /**
-    * Registers a new "LineFileTailUTF8" adaptor and starts it at offset 0. Checks to
-    * see if the file is being watched already, if so, won't register another adaptor
-    * with the agent. If you have run the tail adaptor on this file before and rotated 
-    * or emptied the file you should use {@link ChukwaAgentController#pauseFile(String, String)}
-    * and {@link ChukwaAgentController#resumeFile(String, String)} which will store the adaptors 
-    * metadata and re-use them to pick up where it left off.
-    * @param type the datatype associated with the file to pass through
-    * @param filename of the file for the tail adaptor to start monitoring
-    * @return the id number of the adaptor, generated by the agent
-    */
-  public long addFile(String appType, String filename, long numRetries, long retryInterval)
-  {
+  public long addFile(String appType, String filename, long numRetries,
+      long retryInterval) {
     filename = new File(filename).getAbsolutePath();
-    //TODO: Mabye we want to check to see if the file exists here?
-    //      Probably not because they might be talking to an agent on a different machine?
-    
-    //check to see if this file is being watched already, if yes don't set up another adaptor for it
+    // TODO: Mabye we want to check to see if the file exists here?
+    // Probably not because they might be talking to an agent on a different
+    // machine?
+
+    // check to see if this file is being watched already, if yes don't set up
+    // another adaptor for it
     boolean isDuplicate = false;
-    for (Adaptor a : runningAdaptors.values()){
-      if (a.name.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType) && a.params.endsWith(filename)){
+    for (Adaptor a : runningAdaptors.values()) {
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType)
+          && a.params.endsWith(filename)) {
         isDuplicate = true;
       }
     }
-    if (!isDuplicate){
-      return add(DEFAULT_FILE_TAILER, appType, 0L + " " + filename,0L, numRetries, retryInterval);
-    }
-    else{
-      System.out.println("An adaptor for filename \"" + filename + "\", type \""
-          + appType + "\", exists already, addFile() command aborted");
+    if (!isDuplicate) {
+      return add(DEFAULT_FILE_TAILER, appType, 0L + " " + filename, 0L,
+          numRetries, retryInterval);
+    } else {
+      System.out.println("An adaptor for filename \"" + filename
+          + "\", type \"" + appType
+          + "\", exists already, addFile() command aborted");
       return -1;
     }
   }
-  
-  public long addFile(String appType, String filename){
+
+  public long addFile(String appType, String filename) {
     return addFile(appType, filename, 0, 0);
   }
- 
+
   /**
-   * Pause all active adaptors of the default file tailing type who are tailing this file
-   * This means we actually stop the adaptor and it goes away forever, but we store it
-   * state so that we can re-launch a new adaptor with the same state later.
+   * Pause all active adaptors of the default file tailing type who are tailing
+   * this file This means we actually stop the adaptor and it goes away forever,
+   * but we store it state so that we can re-launch a new adaptor with the same
+   * state later.
+   * 
    * @param appType
    * @param filename
-   * @return array of adaptorID numbers which have been created and assigned the state of the formerly paused adaptors 
+   * @return array of adaptorID numbers which have been created and assigned the
+   *         state of the formerly paused adaptors
    * @throws IOException
    */
-  public Collection<Long> pauseFile(String appType, String filename) throws IOException{
+  public Collection<Long> pauseFile(String appType, String filename)
+      throws IOException {
     syncWithAgent();
-    //store the unique streamid of the file we are pausing.
-    //search the list of adaptors for this filename
-    //store the current offset for it
+    // store the unique streamid of the file we are pausing.
+    // search the list of adaptors for this filename
+    // store the current offset for it
     List<Long> results = new ArrayList<Long>();
-    for (Adaptor a : runningAdaptors.values()){
-      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
-        pausedAdaptors.put(a.id,a); //add it to our list of paused adaptors
-        remove(a.id); //tell the agent to remove/unregister it
+    for (Adaptor a : runningAdaptors.values()) {
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
+          && a.appType.equals(appType)) {
+        pausedAdaptors.put(a.id, a); // add it to our list of paused adaptors
+        remove(a.id); // tell the agent to remove/unregister it
         results.add(a.id);
       }
     }
     return results;
   }
-  
-  public boolean isFilePaused(String appType, String filename){
-    for (Adaptor a : pausedAdaptors.values()){
-      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+
+  public boolean isFilePaused(String appType, String filename) {
+    for (Adaptor a : pausedAdaptors.values()) {
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
+          && a.appType.equals(appType)) {
         return true;
       }
     }
     return false;
   }
-  
+
   /**
    * Resume all adaptors for this filename that have been paused
+   * 
    * @param appType the appType
-   * @param filename filename by which to lookup adaptors which are paused (and tailing this file)
-   * @return an array of the new adaptor ID numbers which have resumed where the old adaptors left off
+   * @param filename filename by which to lookup adaptors which are paused (and
+   *        tailing this file)
+   * @return an array of the new adaptor ID numbers which have resumed where the
+   *         old adaptors left off
    * @throws IOException
    */
-  public Collection<Long> resumeFile(String appType, String filename) throws IOException{
+  public Collection<Long> resumeFile(String appType, String filename)
+      throws IOException {
     syncWithAgent();
-    //search for a record of this paused file
+    // search for a record of this paused file
     List<Long> results = new ArrayList<Long>();
-    for (Adaptor a : pausedAdaptors.values()){
-      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
-        long newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " " + filename, a.offset);
+    for (Adaptor a : pausedAdaptors.values()) {
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
+          && a.appType.equals(appType)) {
+        long newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " "
+            + filename, a.offset);
         pausedAdaptors.remove(a.id);
         a.id = newID;
         results.add(a.id);
@@ -419,104 +463,102 @@
     }
     return results;
   }
-  
-  
-  public void removeFile(String appType, String filename) throws IOException
-  {
+
+  public void removeFile(String appType, String filename) throws IOException {
     syncWithAgent();
     // search for FileTail adaptor with string of this file name
     // get its id, tell it to unregister itself with the agent,
     // then remove it from the list of adaptors
-    for (Adaptor a : runningAdaptors.values()){
-      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+    for (Adaptor a : runningAdaptors.values()) {
+      if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
+          && a.appType.equals(appType)) {
         remove(a.id);
       }
     }
   }
-  
-  //************************************************************************
+
+  // ************************************************************************
   // command line utilities
-  //************************************************************************
-  
-  public static void main(String[] args)
-  {
+  // ************************************************************************
+
+  public static void main(String[] args) {
     ChukwaAgentController c = getClient(args);
-    if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")){
+    if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")) {
       doAddFile(c, args[1], args[2]);
-    }
-    else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")){
+    } else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")) {
       doRemoveFile(c, args[1], args[2]);
-    }
-    else if(numArgs >= 1 && args[0].toLowerCase().equals("list")){
+    } else if (numArgs >= 1 && args[0].toLowerCase().equals("list")) {
       doList(c);
-    }
-    else if(numArgs >= 1 && args[0].equalsIgnoreCase("removeall")){
+    } else if (numArgs >= 1 && args[0].equalsIgnoreCase("removeall")) {
       doRemoveAll(c);
-    }
-    else{
-      System.err.println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
-      System.err.println("       ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
-      System.err.println("       ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
+    } else {
+      System.err
+          .println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
+      System.err
+          .println("       ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
+      System.err
+          .println("       ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
       System.err.println("       ChukwaClient list [IP] [port]");
       System.err.println("       ChukwaClient removeAll [IP] [port]");
     }
   }
-  
-  private static ChukwaAgentController getClient(String[] args){
+
+  private static ChukwaAgentController getClient(String[] args) {
     int portno = 9093;
     String hostname = "localhost";
 
     numArgs = args.length;
-    
-    for (int i = 0; i < args.length; i++){
-      if(args[i].equals("-h") && args.length > i + 1){
-        hostname = args[i+1];
-        System.out.println ("Setting hostname to: " + hostname);
-        numArgs -= 2; //subtract for the flag and value
-      }
-      else if (args[i].equals("-p") && args.length > i + 1){
-        portno = Integer.parseInt(args[i+1]);
-        System.out.println ("Setting portno to: " + portno);
-        numArgs -= 2; //subtract for the flat, i.e. -p, and value
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-h") && args.length > i + 1) {
+        hostname = args[i + 1];
+        System.out.println("Setting hostname to: " + hostname);
+        numArgs -= 2; // subtract for the flag and value
+      } else if (args[i].equals("-p") && args.length > i + 1) {
+        portno = Integer.parseInt(args[i + 1]);
+        System.out.println("Setting portno to: " + portno);
+        numArgs -= 2; // subtract for the flat, i.e. -p, and value
       }
     }
     return new ChukwaAgentController(hostname, portno);
   }
-  
-  private static long doAddFile(ChukwaAgentController c, String appType, String params){
+
+  private static long doAddFile(ChukwaAgentController c, String appType,
+      String params) {
     System.out.println("Adding adaptor with filename: " + params);
     long adaptorID = c.addFile(appType, params);
-    if (adaptorID != -1){
+    if (adaptorID != -1) {
       System.out.println("Successfully added adaptor, id is:" + adaptorID);
-    }else{
-      System.err.println("Agent reported failure to add adaptor, adaptor id returned was:" + adaptorID);
+    } else {
+      System.err
+          .println("Agent reported failure to add adaptor, adaptor id returned was:"
+              + adaptorID);
     }
     return adaptorID;
   }
-  
-  private static void doRemoveFile(ChukwaAgentController c, String appType, String params){
-    try{
+
+  private static void doRemoveFile(ChukwaAgentController c, String appType,
+      String params) {
+    try {
       System.out.println("Removing adaptor with filename: " + params);
-      c.removeFile(appType,params);
-          }
-    catch(IOException e)
-    {
+      c.removeFile(appType, params);
+    } catch (IOException e) {
       e.printStackTrace();
     }
   }
-  
-  private static void doList(ChukwaAgentController c){
-    try{
+
+  private static void doList(ChukwaAgentController c) {
+    try {
       Iterator<Adaptor> adptrs = c.list().values().iterator();
-      while (adptrs.hasNext()){
+      while (adptrs.hasNext()) {
         System.out.println(adptrs.next().toString());
       }
-    } catch(Exception e){
+    } catch (Exception e) {
       e.printStackTrace();
     }
   }
-  
-  private static void doRemoveAll(ChukwaAgentController c){
+
+  private static void doRemoveAll(ChukwaAgentController c) {
     System.out.println("Removing all adaptors");
     c.removeAll();
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java Wed Mar 11 22:39:26 2009
@@ -17,17 +17,18 @@
  */
 package org.apache.hadoop.chukwa.datacollection.protocol;
 
+
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
-
 import org.apache.hadoop.chukwa.Chunk;
 
-public interface Protocol
-{
-	public byte[] toByteArray(List<Chunk> chunks);
-	public List<Chunk> parseFrom(byte[] bytes);
-	
-	void writeTo(OutputStream output);
-	List<Chunk> parseFrom(InputStream input);
+public interface Protocol {
+  public byte[] toByteArray(List<Chunk> chunks);
+
+  public List<Chunk> parseFrom(byte[] bytes);
+
+  void writeTo(OutputStream output);
+
+  List<Chunk> parseFrom(InputStream input);
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.sender;
 
+
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.DataOutputStream;
@@ -28,7 +29,6 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpException;
 import org.apache.commons.httpclient.HttpMethod;
@@ -45,56 +45,56 @@
 import org.apache.log4j.Logger;
 
 /**
- * Encapsulates all of the http setup and connection details needed for
- * chunks to be delivered to a collector.
+ * Encapsulates all of the http setup and connection details needed for chunks
+ * to be delivered to a collector.
+ * <p>
+ * On error, tries the list of available collectors, pauses for a minute, and
+ * then repeats.
+ * </p>
  * <p>
- * On error, tries the list of available collectors, pauses for a minute, and then repeats.
+ * Will wait forever for collectors to come up.
  * </p>
- * <p> Will wait forever for collectors to come up. </p>
  */
-public class ChukwaHttpSender implements ChukwaSender{
-  final int MAX_RETRIES_PER_COLLECTOR ; //fast retries, in http client
-  final int SENDER_RETRIES; 
-  final int WAIT_FOR_COLLECTOR_REBOOT; 
-    //FIXME: this should really correspond to the timer in RetryListOfCollectors
-  
+public class ChukwaHttpSender implements ChukwaSender {
+  final int MAX_RETRIES_PER_COLLECTOR; // fast retries, in http client
+  final int SENDER_RETRIES;
+  final int WAIT_FOR_COLLECTOR_REBOOT;
+  // FIXME: this should really correspond to the timer in RetryListOfCollectors
+
   static Logger log = Logger.getLogger(ChukwaHttpSender.class);
   static HttpClient client = null;
   static MultiThreadedHttpConnectionManager connectionManager = null;
   static String currCollector = null;
 
-  
   protected Iterator<String> collectors;
-  
-  static
-  {
-    connectionManager = 
-          new MultiThreadedHttpConnectionManager();
+
+  static {
+    connectionManager = new MultiThreadedHttpConnectionManager();
     client = new HttpClient(connectionManager);
     connectionManager.closeIdleConnections(1000);
   }
-  
+
   public static class CommitListEntry {
     public Adaptor adaptor;
     public long uuid;
-    
-    public CommitListEntry(Adaptor a, long uuid)  {
+
+    public CommitListEntry(Adaptor a, long uuid) {
       adaptor = a;
       this.uuid = uuid;
     }
   }
-  
-//FIXME: probably we're better off with an EventListRequestEntity
+
+  // FIXME: probably we're better off with an EventListRequestEntity
   static class BuffersRequestEntity implements RequestEntity {
     List<DataOutputBuffer> buffers;
-    
+
     public BuffersRequestEntity(List<DataOutputBuffer> buf) {
-      buffers=buf;
+      buffers = buf;
     }
 
-    public long getContentLength()  {
-      long len=4;//first we send post length, then buffers
-      for(DataOutputBuffer b: buffers)
+    public long getContentLength() {
+      long len = 4;// first we send post length, then buffers
+      for (DataOutputBuffer b : buffers)
         len += b.getLength();
       return len;
     }
@@ -103,167 +103,179 @@
       return "application/octet-stream";
     }
 
-    public boolean isRepeatable()  {
+    public boolean isRepeatable() {
       return true;
     }
 
-    public void writeRequest(OutputStream out) throws IOException  {
+    public void writeRequest(OutputStream out) throws IOException {
       DataOutputStream dos = new DataOutputStream(out);
       dos.writeInt(buffers.size());
-      for(DataOutputBuffer b: buffers)
+      for (DataOutputBuffer b : buffers)
         dos.write(b.getData(), 0, b.getLength());
     }
   }
 
-  public ChukwaHttpSender(Configuration c){
-    //setup default collector
+  public ChukwaHttpSender(Configuration c) {
+    // setup default collector
     ArrayList<String> tmp = new ArrayList<String>();
     this.collectors = tmp.iterator();
-    log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: " + collectors.hasNext());
+    log
+        .info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: "
+            + collectors.hasNext());
 
     MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
-    SENDER_RETRIES= c.getInt("chukwaAgent.sender.retries", 144000);
-    WAIT_FOR_COLLECTOR_REBOOT= c.getInt("chukwaAgent.sender.retryInterval", 20*1000);
+    SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
+    WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
+        20 * 1000);
   }
-  
+
   /**
    * Set up a single connector for this client to send {@link Chunk}s to
+   * 
    * @param collector the url of the collector
    */
-  public void setCollectors(String collector){
-   }
-  
+  public void setCollectors(String collector) {
+  }
+
   /**
    * Set up a list of connectors for this client to send {@link Chunk}s to
+   * 
    * @param collectors
    */
-  public void setCollectors(Iterator<String> collectors){
-    this.collectors = collectors; 
-    //setup a new destination from our list of collectors if one hasn't been set up
-    if (currCollector == null){
-      if (collectors.hasNext()){
+  public void setCollectors(Iterator<String> collectors) {
+    this.collectors = collectors;
+    // setup a new destination from our list of collectors if one hasn't been
+    // set up
+    if (currCollector == null) {
+      if (collectors.hasNext()) {
         currCollector = collectors.next();
-      }
-      else
-        log.error("No collectors to try in send(), not even trying to do doPost()");
+      } else
+        log
+            .error("No collectors to try in send(), not even trying to do doPost()");
     }
   }
-  
-  
+
   /**
-   * grab all of the chunks currently in the chunkQueue, stores a copy of them locally, calculates
-   * their size, sets them up 
+   * grab all of the chunks currently in the chunkQueue, stores a copy of them
+   * locally, calculates their size, sets them up
+   * 
    * @return array of chunk id's which were ACKed by collector
    */
-  public List<CommitListEntry> send(List<Chunk> toSend) throws InterruptedException, IOException{
+  public List<CommitListEntry> send(List<Chunk> toSend)
+      throws InterruptedException, IOException {
     List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
     List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
-    
+
     log.info("collected " + toSend.size() + " chunks");
 
-    //Serialize each chunk in turn into it's own DataOutputBuffer and add that buffer to serializedEvents  
-    for(Chunk c: toSend) {
+    // Serialize each chunk in turn into it's own DataOutputBuffer and add that
+    // buffer to serializedEvents
+    for (Chunk c : toSend) {
       DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
       try {
         c.write(b);
-      }catch(IOException err) {
+      } catch (IOException err) {
         log.error("serialization threw IOException", err);
       }
       serializedEvents.add(b);
-      //store a CLE for this chunk which we will use to ack this chunk to the caller of send()
-      //(e.g. the agent will use the list of CLE's for checkpointing)
+      // store a CLE for this chunk which we will use to ack this chunk to the
+      // caller of send()
+      // (e.g. the agent will use the list of CLE's for checkpointing)
       commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID()));
     }
     toSend.clear();
-    
-    //collect all serialized chunks into a single buffer to send
-    RequestEntity postData = new BuffersRequestEntity(serializedEvents);
 
+    // collect all serialized chunks into a single buffer to send
+    RequestEntity postData = new BuffersRequestEntity(serializedEvents);
 
-    int retries = SENDER_RETRIES; 
-    while(currCollector != null)
-    {
-      //need to pick a destination here
+    int retries = SENDER_RETRIES;
+    while (currCollector != null) {
+      // need to pick a destination here
       PostMethod method = new PostMethod();
-      try   {
+      try {
         doPost(method, postData, currCollector);
 
-        retries = SENDER_RETRIES; //reset count on success
-        //if no exception was thrown from doPost, ACK that these chunks were sent
+        retries = SENDER_RETRIES; // reset count on success
+        // if no exception was thrown from doPost, ACK that these chunks were
+        // sent
         return commitResults;
       } catch (Throwable e) {
         log.error("Http post exception", e);
-        log.info("Checking list of collectors to see if another collector has been specified for rollover");
-        if (collectors.hasNext()){
+        log
+            .info("Checking list of collectors to see if another collector has been specified for rollover");
+        if (collectors.hasNext()) {
           currCollector = collectors.next();
-          log.info("Found a new collector to roll over to, retrying HTTP Post to collector " + currCollector);
+          log
+              .info("Found a new collector to roll over to, retrying HTTP Post to collector "
+                  + currCollector);
         } else {
-          if(retries > 0) {
-            log.warn("No more collectors to try rolling over to; waiting " + WAIT_FOR_COLLECTOR_REBOOT +
-                " ms (" + retries + "retries left)");
+          if (retries > 0) {
+            log.warn("No more collectors to try rolling over to; waiting "
+                + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
+                + "retries left)");
             Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
-            retries --;
+            retries--;
           } else {
             log.error("No more collectors to try rolling over to; aborting");
             throw new IOException("no collectors");
           }
         }
-      }
-      finally  {
+      } finally {
         // be sure the connection is released back to the connection manager
         method.releaseConnection();
       }
-    } //end retry loop
+    } // end retry loop
     return new ArrayList<CommitListEntry>();
   }
-  
+
   /**
    * Handles the HTTP post. Throws HttpException on failure
    */
   @SuppressWarnings("deprecation")
   private void doPost(PostMethod method, RequestEntity data, String dest)
-      throws IOException, HttpException
-  {
-    
+      throws IOException, HttpException {
+
     HttpMethodParams pars = method.getParams();
-    pars.setParameter (HttpMethodParams.RETRY_HANDLER, (Object) new HttpMethodRetryHandler()
-    {
-      public boolean retryMethod(HttpMethod m, IOException e, int exec)
-      {
-        return !(e instanceof java.net.ConnectException) && (exec < MAX_RETRIES_PER_COLLECTOR);
-      }
-    });
-    
-    pars.setParameter(HttpMethodParams.SO_TIMEOUT , new Integer(30000));
-    
-    
-    
+    pars.setParameter(HttpMethodParams.RETRY_HANDLER,
+        (Object) new HttpMethodRetryHandler() {
+          public boolean retryMethod(HttpMethod m, IOException e, int exec) {
+            return !(e instanceof java.net.ConnectException)
+                && (exec < MAX_RETRIES_PER_COLLECTOR);
+          }
+        });
+
+    pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(30000));
+
     method.setParams(pars);
     method.setPath(dest);
-    
-     //send it across the network
+
+    // send it across the network
     method.setRequestEntity(data);
-    
-    log.info(">>>>>> HTTP post to " + dest+" length = "+ data.getContentLength());
+
+    log.info(">>>>>> HTTP post to " + dest + " length = "
+        + data.getContentLength());
     // Send POST request
-    
-    //client.setTimeout(15*1000);
+
+    // client.setTimeout(15*1000);
     int statusCode = client.executeMethod(method);
-      
-    if (statusCode != HttpStatus.SC_OK)  {
-      log.error(">>>>>> HTTP post response statusCode: " +statusCode + ", statusLine: " + method.getStatusLine());
-      //do something aggressive here
+
+    if (statusCode != HttpStatus.SC_OK) {
+      log.error(">>>>>> HTTP post response statusCode: " + statusCode
+          + ", statusLine: " + method.getStatusLine());
+      // do something aggressive here
       throw new HttpException("got back a failure from server");
     }
-    //implicitly "else"
-    log.info(">>>>>> HTTP Got success back from the remote collector; response length "+ method.getResponseContentLength());
+    // implicitly "else"
+    log
+        .info(">>>>>> HTTP Got success back from the remote collector; response length "
+            + method.getResponseContentLength());
 
-      //FIXME: should parse acks here
+    // FIXME: should parse acks here
     InputStream rstream = null;
-    
+
     // Get the response body
     byte[] resp_buf = method.getResponseBody();
-    rstream = new ByteArrayInputStream(resp_buf); 
+    rstream = new ByteArrayInputStream(resp_buf);
     BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
     String line;
     while ((line = br.readLine()) != null) {
@@ -272,10 +284,12 @@
       }
     }
   }
-  
-  public static void main(String[] argv) throws InterruptedException{
-    //HttpConnectorClient cc = new HttpConnectorClient();
-    //do something smarter than to hide record headaches, like force them to create and add records to a chunk
-    //cc.addChunk("test-source", "test-streamName", "test-application", "test-dataType", new byte[]{1,2,3,4,5});
+
+  public static void main(String[] argv) throws InterruptedException {
+    // HttpConnectorClient cc = new HttpConnectorClient();
+    // do something smarter than to hide record headaches, like force them to
+    // create and add records to a chunk
+    // cc.addChunk("test-source", "test-streamName", "test-application",
+    // "test-dataType", new byte[]{1,2,3,4,5});
   }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java Wed Mar 11 22:39:26 2009
@@ -1,12 +1,12 @@
 package org.apache.hadoop.chukwa.datacollection.sender;
 
+
 /**
  * Encapsulates all of the communication overhead needed for chunks to be delivered
  * to a collector.
  */
 import java.util.Iterator;
 import java.util.List;
-
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
 
@@ -16,10 +16,11 @@
    * 
    * @param chunksToSend a list of chunks to commit
    * @return the list of committed chunks
-   * @throws InterruptedException if interrupted while trying to send 
+   * @throws InterruptedException if interrupted while trying to send
    */
-  public List<CommitListEntry> send(List<Chunk> chunksToSend) throws InterruptedException, java.io.IOException;
-  
+  public List<CommitListEntry> send(List<Chunk> chunksToSend)
+      throws InterruptedException, java.io.IOException;
+
   public void setCollectors(Iterator<String> collectors);
-  
+
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java Wed Mar 11 22:39:26 2009
@@ -18,67 +18,71 @@
 
 package org.apache.hadoop.chukwa.datacollection.sender;
 
+
 import java.io.*;
 import java.net.URL;
 import java.util.*;
-
 import org.apache.hadoop.conf.Configuration;
 
 /***
- * An iterator returning a list of Collectors to try.
- * This class is nondeterministic, since it puts collectors back on the list after some period.
+ * An iterator returning a list of Collectors to try. This class is
+ * nondeterministic, since it puts collectors back on the list after some
+ * period.
+ * 
+ * No node will be polled more than once per maxRetryRateMs milliseconds.
+ * hasNext() will continue return true if you have not called it recently.
+ * 
  * 
- * No node will be polled more than once per maxRetryRateMs milliseconds. hasNext() will continue return
- * true if you have not called it recently.
- *
- *
  */
 public class RetryListOfCollectors implements Iterator<String> {
 
   int maxRetryRateMs;
   List<String> collectors;
   long lastLookAtFirstNode;
-  int nextCollector=0;
-  private String portNo; 
+  int nextCollector = 0;
+  private String portNo;
   Configuration conf;
-  
-  public RetryListOfCollectors(File collectorFile, int maxRetryRateMs) throws IOException {
+
+  public RetryListOfCollectors(File collectorFile, int maxRetryRateMs)
+      throws IOException {
     this.maxRetryRateMs = maxRetryRateMs;
     lastLookAtFirstNode = 0;
     collectors = new ArrayList<String>();
     conf = new Configuration();
-    portNo = conf.get("chukwaCollector.http.port","8080");
-    
-    try{
-      BufferedReader br  = new BufferedReader(new FileReader(collectorFile));
+    portNo = conf.get("chukwaCollector.http.port", "8080");
+
+    try {
+      BufferedReader br = new BufferedReader(new FileReader(collectorFile));
       String line;
-      while((line = br.readLine()) != null) {
-        if(!line.contains("://")) { 
-        	//no protocol, assume http
-        	if(line.matches(":\\d+")) {
-                collectors.add("http://" + line);
-        	} else {
-                collectors.add("http://" + line + ":" + portNo + "/");
-        	}
+      while ((line = br.readLine()) != null) {
+        if (!line.contains("://")) {
+          // no protocol, assume http
+          if (line.matches(":\\d+")) {
+            collectors.add("http://" + line);
+          } else {
+            collectors.add("http://" + line + ":" + portNo + "/");
+          }
         } else {
-        	if(line.matches(":\\d+")) {
-                collectors.add(line);
-        	} else {
-                collectors.add(line + ":" + portNo + "/");
-        	}
-        	collectors.add(line);
+          if (line.matches(":\\d+")) {
+            collectors.add(line);
+          } else {
+            collectors.add(line + ":" + portNo + "/");
+          }
+          collectors.add(line);
         }
       }
       br.close();
-    }catch(FileNotFoundException e){
-      System.err.println("Error in RetryListOfCollectors() opening file: collectors, double check that you have set the CHUKWA_CONF_DIR environment variable. Also, ensure file exists and is in classpath");
-    }catch(IOException e){
-      System.err.println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
+    } catch (FileNotFoundException e) {
+      System.err
+          .println("Error in RetryListOfCollectors() opening file: collectors, double check that you have set the CHUKWA_CONF_DIR environment variable. Also, ensure file exists and is in classpath");
+    } catch (IOException e) {
+      System.err
+          .println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
       throw e;
     }
     shuffleList();
   }
-  
+
   public RetryListOfCollectors(final List<String> collectors, int maxRetryRateMs) {
     this.maxRetryRateMs = maxRetryRateMs;
     lastLookAtFirstNode = 0;
@@ -86,54 +90,54 @@
     this.collectors.addAll(collectors);
     shuffleList();
   }
-  
-  //for now, use a simple O(n^2) algorithm.
-  //safe, because we only do this once, and on smalls list
+
+  // for now, use a simple O(n^2) algorithm.
+  // safe, because we only do this once, and on smalls list
   private void shuffleList() {
-   ArrayList<String> newList = new  ArrayList<String>();
+    ArrayList<String> newList = new ArrayList<String>();
     Random r = new java.util.Random();
-    while(!collectors.isEmpty()) {
+    while (!collectors.isEmpty()) {
       int toRemove = r.nextInt(collectors.size());
       String next = collectors.remove(toRemove);
       newList.add(next);
     }
     collectors = newList;
   }
-  
+
   public boolean hasNext() {
-    return collectors.size() > 0 && 
-      ( (nextCollector  != 0)  || 
-         (System.currentTimeMillis() - lastLookAtFirstNode > maxRetryRateMs ));
-   }
+    return collectors.size() > 0
+        && ((nextCollector != 0) || (System.currentTimeMillis()
+            - lastLookAtFirstNode > maxRetryRateMs));
+  }
 
   public String next() {
-    if(hasNext())  {
+    if (hasNext()) {
       int currCollector = nextCollector;
-      nextCollector = (nextCollector +1)% collectors.size();
-      if(currCollector == 0)
+      nextCollector = (nextCollector + 1) % collectors.size();
+      if (currCollector == 0)
         lastLookAtFirstNode = System.currentTimeMillis();
       return collectors.get(currCollector);
-    }
-    else
+    } else
       return null;
   }
-  
-  public String getRandomCollector(){
-    return collectors.get( (int)java.lang.Math.random() * collectors.size());
+
+  public String getRandomCollector() {
+    return collectors.get((int) java.lang.Math.random() * collectors.size());
   }
-  
-  public void add(URL collector){
+
+  public void add(URL collector) {
     collectors.add(collector.toString());
   }
 
-  public void remove()  {
+  public void remove() {
     throw new UnsupportedOperationException();
-    //FIXME: maybe just remove a collector from our list and then 
-    //FIXME: make sure next doesn't break (i.e. reset nextCollector if necessary)
+    // FIXME: maybe just remove a collector from our list and then
+    // FIXME: make sure next doesn't break (i.e. reset nextCollector if
+    // necessary)
   }
 
   /**
-   *  
+   * 
    * @return total number of collectors in list
    */
   int total() {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java Wed Mar 11 22:39:26 2009
@@ -18,80 +18,73 @@
 
 package org.apache.hadoop.chukwa.datacollection.test;
 
+
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.*;
 import org.apache.hadoop.chukwa.datacollection.agent.*;
 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
-
 import java.util.*;
 
 /**
- * Output events to stdout.
- * Intended for debugging use.
- *
+ * Output events to stdout. Intended for debugging use.
+ * 
  */
 public class ConsoleOutConnector extends Thread implements Connector {
-  
+
   final ChukwaAgent agent;
   volatile boolean shutdown;
   final boolean silent;
-  
 
   public ConsoleOutConnector(ChukwaAgent a) {
     this(a, false);
   }
-  
-  public ConsoleOutConnector(ChukwaAgent a, boolean silent)
-  {
+
+  public ConsoleOutConnector(ChukwaAgent a, boolean silent) {
     agent = a;
     this.silent = silent;
   }
-  
-  public void run()
-  {
-    try{
+
+  public void run() {
+    try {
       System.out.println("console connector started");
       ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();
-      if(!silent)
+      if (!silent)
         System.out.println("-------------------");
-      
-      while(!shutdown)
-      {
+
+      while (!shutdown) {
         List<Chunk> evts = new ArrayList<Chunk>();
         eventQueue.collect(evts, 1);
-        
-        for(Chunk e: evts)
-        {
-          if(!silent) {
-            System.out.println("Console out connector got event at offset " + e.getSeqID());
+
+        for (Chunk e : evts) {
+          if (!silent) {
+            System.out.println("Console out connector got event at offset "
+                + e.getSeqID());
             System.out.println("data type was " + e.getDataType());
-            if(e.getData().length > 1000)
-              System.out.println("data length was " + e.getData().length+ ", not printing");
+            if (e.getData().length > 1000)
+              System.out.println("data length was " + e.getData().length
+                  + ", not printing");
             else
               System.out.println(new String(e.getData()));
           }
-          
+
           agent.reportCommit(e.getInitiator(), e.getSeqID());
-         
-          if(!silent)
+
+          if (!silent)
             System.out.println("-------------------");
         }
       }
-    }
-    catch(InterruptedException e)
-    {} //thread is about to exit anyway
+    } catch (InterruptedException e) {
+    } // thread is about to exit anyway
   }
 
-  public void shutdown()
-  {
+  public void shutdown() {
     shutdown = true;
     this.interrupt();
   }
 
-@Override
-public void reloadConfiguration()
-{
-	System.out.println("reloadConfiguration");
-}
+  @Override
+  public void reloadConfiguration() {
+    System.out.println("reloadConfiguration");
+  }
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.chukwa.datacollection.test;
 
+
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
@@ -32,66 +33,66 @@
 
 public class FileTailerStressTest {
 
-  static final int DELAY_MIN = 10*1000;
-  static final int DELAY_RANGE = 2* 1000;
-  
-  static class OccasionalWriterThread extends Thread
-  {
+  static final int DELAY_MIN = 10 * 1000;
+  static final int DELAY_RANGE = 2 * 1000;
+
+  static class OccasionalWriterThread extends Thread {
     File file;
-    
-    OccasionalWriterThread(File f)  {
+
+    OccasionalWriterThread(File f) {
       file = f;
     }
-    
-    public void run()  {
+
+    public void run() {
       try {
-      FileOutputStream fos = new FileOutputStream(file);
-      PrintWriter out = new PrintWriter(fos);
-      Random rand = new Random();
-      while(true) {
-        int delay = rand.nextInt( DELAY_RANGE ) + DELAY_MIN;
-        Thread.sleep(delay);
-        Date d = new Date();
-        out.println("some test data written at " + d.toString());
-        out.flush();
-      }
-      } catch(IOException e) {
+        FileOutputStream fos = new FileOutputStream(file);
+        PrintWriter out = new PrintWriter(fos);
+        Random rand = new Random();
+        while (true) {
+          int delay = rand.nextInt(DELAY_RANGE) + DELAY_MIN;
+          Thread.sleep(delay);
+          Date d = new Date();
+          out.println("some test data written at " + d.toString());
+          out.flush();
+        }
+      } catch (IOException e) {
         e.printStackTrace();
       } catch (InterruptedException e) {
       }
     }
   }
-    
-static int FILES_TO_USE  = 100;
+
+  static int FILES_TO_USE = 100;
+
   /**
    * @param args
    */
-  public static void main(String[] args)
-  {
-    try{
+  public static void main(String[] args) {
+    try {
       Server server = new Server(9990);
-      Context root = new Context(server,"/",Context.SESSIONS);
-  
+      Context root = new Context(server, "/", Context.SESSIONS);
+
       ServletCollector.setWriter(new ConsoleWriter(true));
-      root.addServlet(new ServletHolder(new ServletCollector(new ChukwaConfiguration(true))), "/*");
+      root.addServlet(new ServletHolder(new ServletCollector(
+          new ChukwaConfiguration(true))), "/*");
       server.start();
       server.setStopAtShutdown(false);
-  
+
       Thread.sleep(1000);
       ChukwaAgent agent = new ChukwaAgent();
-      HttpConnector connector = new HttpConnector(agent, "http://localhost:9990/chukwa");
+      HttpConnector connector = new HttpConnector(agent,
+          "http://localhost:9990/chukwa");
       connector.start();
-      
+
       ChukwaConfiguration cc = new ChukwaConfiguration();
       int portno = cc.getInt("chukwaAgent.control.port", 9093);
       ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
-      
 
       File workdir = new File("/tmp/stresstest/");
       workdir.mkdir();
-      for(int i = 0; i < FILES_TO_USE; ++i) {
-        File newTestF = new File( "/tmp/stresstest/" + i);
-        
+      for (int i = 0; i < FILES_TO_USE; ++i) {
+        File newTestF = new File("/tmp/stresstest/" + i);
+
         newTestF.deleteOnExit();
         (new OccasionalWriterThread(newTestF)).start();
         cli.addFile("test-lines", newTestF.getAbsolutePath());
@@ -100,7 +101,7 @@
       Thread.sleep(60 * 1000);
       System.out.println("cleaning up");
       workdir.delete();
-    } catch(Exception e) {
+    } catch (Exception e) {
       e.printStackTrace();
     }
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.chukwa.datacollection.test;
 
-import java.net.URI;
 
+import java.net.URI;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.ChunkImpl;
 import org.apache.hadoop.conf.Configuration;
@@ -29,59 +29,53 @@
 import org.apache.hadoop.io.Writable;
 
 public class SinkFileValidator {
-  
-  public static void main(String[] args)
-  {
+
+  public static void main(String[] args) {
     String fsURL = "hdfs://localhost:9000";
     String fname;
-    if(args.length < 1)
-    {
-      System.out.println("usage:  SinkFileValidator <filename> [filesystem URI] ");
+    if (args.length < 1) {
+      System.out
+          .println("usage:  SinkFileValidator <filename> [filesystem URI] ");
       System.exit(0);
     }
     fname = args[0];
-    if(args.length > 1)
+    if (args.length > 1)
       fsURL = args[1];
 
     Configuration conf = new Configuration();
-    try
-    {
-    FileSystem fs;
-    if(fsURL.equals("local"))
-      fs = FileSystem.getLocal(conf);
-    else
-       fs= FileSystem.get(new URI(fsURL), conf);
-    SequenceFile.Reader r= new SequenceFile.Reader(fs, new Path(fname), conf);
-    System.out.println("key class name is " + r.getKeyClassName());
-    System.out.println("value class name is " + r.getValueClassName());
-    
-    ChukwaArchiveKey key = new ChukwaArchiveKey();
-    ChunkImpl evt =  ChunkImpl.getBlankChunk();
-    int events = 0;
-    while(r.next(key, evt) &&  (events < 5))
-    {
-      if(!Writable.class.isAssignableFrom(key.getClass()))
-        System.out.println("warning: keys aren't writable");
-      
-      if(!Writable.class.isAssignableFrom(evt.getClass()))
-        System.out.println("warning: values aren't writable");
-      
-      if(evt.getData().length > 1000)
-      {
-        System.out.println("got event; data: " + new String(evt.getData(), 0, 1000));
-        System.out.println("....[truncating]");
-      }
+    try {
+      FileSystem fs;
+      if (fsURL.equals("local"))
+        fs = FileSystem.getLocal(conf);
       else
-        System.out.println("got event; data: " + new String(evt.getData()));
-      events ++;
-    }
-    System.out.println("file looks OK!");
-    }
-    catch(Exception e)
-    {
+        fs = FileSystem.get(new URI(fsURL), conf);
+      SequenceFile.Reader r = new SequenceFile.Reader(fs, new Path(fname), conf);
+      System.out.println("key class name is " + r.getKeyClassName());
+      System.out.println("value class name is " + r.getValueClassName());
+
+      ChukwaArchiveKey key = new ChukwaArchiveKey();
+      ChunkImpl evt = ChunkImpl.getBlankChunk();
+      int events = 0;
+      while (r.next(key, evt) && (events < 5)) {
+        if (!Writable.class.isAssignableFrom(key.getClass()))
+          System.out.println("warning: keys aren't writable");
+
+        if (!Writable.class.isAssignableFrom(evt.getClass()))
+          System.out.println("warning: values aren't writable");
+
+        if (evt.getData().length > 1000) {
+          System.out.println("got event; data: "
+              + new String(evt.getData(), 0, 1000));
+          System.out.println("....[truncating]");
+        } else
+          System.out.println("got event; data: " + new String(evt.getData()));
+        events++;
+      }
+      System.out.println("file looks OK!");
+    } catch (Exception e) {
       e.printStackTrace();
     }
-    
+
   }
 
 }