You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [4/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoop...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletDiagnostics.java Wed Mar 11 22:39:26 2009
@@ -17,11 +17,10 @@
*/
package org.apache.hadoop.chukwa.datacollection.collector.servlet;
-import java.io.PrintStream;
+import java.io.PrintStream;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.log4j.Logger;
-
import java.util.*;
/**
@@ -29,42 +28,41 @@
*/
public class ServletDiagnostics {
- static Logger log= Logger.getLogger(ServletDiagnostics.class);
-
+ static Logger log = Logger.getLogger(ServletDiagnostics.class);
static int CHUNKS_TO_KEEP = 50;
static int CHUNKS_TO_DISPLAY = 50;
-
- private static class PostStats { //statistics about a chunk
- public PostStats(String src, int count, long receivedTs)
- {
+
+ private static class PostStats { // statistics about a chunk
+ public PostStats(String src, int count, long receivedTs) {
this.count = count;
this.src = src;
this.receivedTs = receivedTs;
types = new String[count];
names = new String[count];
lengths = new int[count];
-
+
seenChunkCount = 0;
dataSize = 0;
}
+
final int count;
final String src;
final long receivedTs;
final String[] types, names;
final int[] lengths;
-
+
int seenChunkCount;
long dataSize;
- public void addChunk(ChunkImpl c, int position)
- {
- if(position != seenChunkCount)
- log.warn("servlet collector is passing chunk " + position + " but diagnostics has seen" +
- seenChunkCount);
- else if(seenChunkCount >= count){
- log.warn("too many chunks in post declared as length " +count);
+
+ public void addChunk(ChunkImpl c, int position) {
+ if (position != seenChunkCount)
+ log.warn("servlet collector is passing chunk " + position
+ + " but diagnostics has seen" + seenChunkCount);
+ else if (seenChunkCount >= count) {
+ log.warn("too many chunks in post declared as length " + count);
} else {
- types[seenChunkCount] = c.getDataType();
+ types[seenChunkCount] = c.getDataType();
lengths[seenChunkCount] = c.getData().length;
names[seenChunkCount] = c.getStreamName();
dataSize += c.getData().length;
@@ -72,7 +70,7 @@
}
}
}
-
+
static {
lastPosts = new LinkedList<PostStats>();
}
@@ -80,74 +78,77 @@
static LinkedList<PostStats> lastPosts;
PostStats curPost;
-
public void sawPost(String source, int chunks, long receivedTs) {
- if(curPost != null) {
+ if (curPost != null) {
log.warn("should only have one HTTP post per ServletDiagnostics");
doneWithPost();
}
curPost = new PostStats(source, chunks, receivedTs);
}
-
+
public void sawChunk(ChunkImpl c, int pos) {
curPost.addChunk(c, pos);
}
public static void printPage(PrintStream out) {
-
- HashMap<String, Long> bytesFromHost = new HashMap<String, Long>();
+
+ HashMap<String, Long> bytesFromHost = new HashMap<String, Long>();
long timeWindowOfSample = Long.MAX_VALUE;
long now = System.currentTimeMillis();
out.println("<ul>");
-
- synchronized(lastPosts) {
- int toSkip = lastPosts.size() - CHUNKS_TO_DISPLAY;
-
- if(!lastPosts.isEmpty())
- timeWindowOfSample = now - lastPosts.peek().receivedTs;
-
- for(PostStats stats: lastPosts) {
+
+ synchronized (lastPosts) {
+ int toSkip = lastPosts.size() - CHUNKS_TO_DISPLAY;
+
+ if (!lastPosts.isEmpty())
+ timeWindowOfSample = now - lastPosts.peek().receivedTs;
+
+ for (PostStats stats : lastPosts) {
Long oldBytes = bytesFromHost.get(stats.src);
long newBytes = stats.dataSize;
- if(oldBytes != null)
+ if (oldBytes != null)
newBytes += oldBytes;
bytesFromHost.put(stats.src, newBytes);
-
- if( -- toSkip < 0) { //done skipping
+
+ if (--toSkip < 0) { // done skipping
out.print("<li>");
-
- out.print(stats.dataSize + " bytes from " + stats.src + " at timestamp " + stats.receivedTs);
- out.println(" which was " + ((now - stats.receivedTs)/ 1000) + " seconds ago");
-
+
+ out.print(stats.dataSize + " bytes from " + stats.src
+ + " at timestamp " + stats.receivedTs);
+ out.println(" which was " + ((now - stats.receivedTs) / 1000)
+ + " seconds ago");
+
out.println("<ol>");
- for(int i =0; i < stats.count; ++i)
- out.println("<li> "+ stats.lengths[i] + " bytes of type " +
- stats.types[i] + ". Adaptor name ="+ stats.names[i] +" </li>");
+ for (int i = 0; i < stats.count; ++i)
+ out.println("<li> " + stats.lengths[i] + " bytes of type "
+ + stats.types[i] + ". Adaptor name =" + stats.names[i]
+ + " </li>");
out.println("</ol></li>");
}
}
}
out.println("</ul>");
out.println("<ul>");
- for(Map.Entry<String, Long> h: bytesFromHost.entrySet()) {
- out.print("<li>rate from " + h.getKey() + " was " + (1000 * h.getValue() / timeWindowOfSample));
- out.println(" bytes/second in last " + timeWindowOfSample/1000 + " seconds.</li>");
+ for (Map.Entry<String, Long> h : bytesFromHost.entrySet()) {
+ out.print("<li>rate from " + h.getKey() + " was "
+ + (1000 * h.getValue() / timeWindowOfSample));
+ out.println(" bytes/second in last " + timeWindowOfSample / 1000
+ + " seconds.</li>");
}
-
- out.println("</ul>");
+ out.println("</ul>");
out.println("total of " + bytesFromHost.size() + " unique hosts seen");
out.println("<p>current time is " + System.currentTimeMillis() + " </p>");
}
public void doneWithPost() {
- synchronized(lastPosts) {
- if(lastPosts.size() > CHUNKS_TO_KEEP)
+ synchronized (lastPosts) {
+ if (lastPosts.size() > CHUNKS_TO_KEEP)
lastPosts.removeFirst();
lastPosts.add(curPost);
}
}
-
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/Connector.java Wed Mar 11 22:39:26 2009
@@ -18,30 +18,31 @@
package org.apache.hadoop.chukwa.datacollection.connector;
+
/**
- * This class is responsible for setting up a long living process that repeatedly calls the
- * <code>send</code> function of a Sender.
+ * This class is responsible for setting up a long living process that
+ * repeatedly calls the <code>send</code> function of a Sender.
*/
-public interface Connector
-{
- static final int proxyTimestampField = 0;
- /**
+public interface Connector {
+ static final int proxyTimestampField = 0;
+ /**
*
*/
- static final int proxyURIField = 1;
- static final int proxyRetryField = 2;
-
- static final int adaptorTimestampField = 3;
- static final int adaptorURIField = 4;
-
- static final int logTimestampField = 5;
- static final int logSourceField = 6;
- static final int logApplicationField = 7;
- static final int logEventField = 8;
-
-
- public void start();
- public void shutdown();
- public void reloadConfiguration();
+ static final int proxyURIField = 1;
+ static final int proxyRetryField = 2;
+
+ static final int adaptorTimestampField = 3;
+ static final int adaptorURIField = 4;
+
+ static final int logTimestampField = 5;
+ static final int logSourceField = 6;
+ static final int logApplicationField = 7;
+ static final int logEventField = 8;
+
+ public void start();
+
+ public void shutdown();
+
+ public void reloadConfiguration();
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.connector.http;
+
/**
* This class is responsible for setting up a {@link HttpConnectorClient} with a collectors
* and then repeatedly calling its send function which encapsulates the work of setting up the
@@ -31,7 +32,7 @@
* On error, tries the list of available collectors, pauses for a minute, and then repeats.
* </p>
* <p> Will wait forever for collectors to come up. </p>
-
+
*/
import java.io.IOException;
@@ -40,7 +41,6 @@
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.hadoop.chukwa.datacollection.DataFactory;
@@ -49,155 +49,154 @@
import org.apache.hadoop.chukwa.datacollection.sender.*;
import org.apache.log4j.Logger;
+public class HttpConnector implements Connector, Runnable {
-public class HttpConnector implements Connector, Runnable {
-
- static Logger log = Logger.getLogger(HttpConnector.class);
+ static Logger log = Logger.getLogger(HttpConnector.class);
static Timer statTimer = null;
static volatile int chunkCount = 0;
- static final int MAX_SIZE_PER_POST = 2*1024*1024;
- static final int MIN_POST_INTERVAL= 5 * 1000;
+ static final int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
+ static final int MIN_POST_INTERVAL = 5 * 1000;
static ChunkQueue chunkQueue;
-
+
ChukwaAgent agent;
String argDestination = null;
-
+
private volatile boolean stopMe = false;
private boolean reloadConfiguration = false;
private Iterator<String> collectors = null;
protected ChukwaSender connectorClient = null;
-
- static{
+
+ static {
statTimer = new Timer();
chunkQueue = DataFactory.getInstance().getEventQueue();
statTimer.schedule(new TimerTask() {
public void run() {
int count = chunkCount;
- chunkCount = 0;
- log.info("# http chunks ACK'ed since last report: " + count );
+ chunkCount = 0;
+ log.info("# http chunks ACK'ed since last report: " + count);
}
- }, 100,60*1000);
+ }, 100, 60 * 1000);
+ }
+
+ public HttpConnector(ChukwaAgent agent) {
+ this.agent = agent;
+ }
+
+ public HttpConnector(ChukwaAgent agent, String destination) {
+ this.agent = agent;
+ this.argDestination = destination;
+
+ log.info("Setting HTTP Connector URL manually using arg passed to Agent: "
+ + destination);
+ }
+
+ public void start() {
+ (new Thread(this, "HTTP post thread")).start();
+ }
+
+ public void shutdown() {
+ stopMe = true;
+ }
+
+ public void run() {
+ log.info("HttpConnector started at time:" + System.currentTimeMillis());
+
+ Iterator<String> destinations = null;
+
+ // build a list of our destinations from collectors
+ try {
+ destinations = DataFactory.getInstance().getCollectorURLs();
+ } catch (IOException e) {
+ log.error("Failed to retreive list of collectors from "
+ + "conf/collectors file", e);
+ }
+
+ connectorClient = new ChukwaHttpSender(agent.getConfiguration());
+
+ if (argDestination != null) {
+ ArrayList<String> tmp = new ArrayList<String>();
+ tmp.add(argDestination);
+ collectors = tmp.iterator();
+ connectorClient.setCollectors(collectors);
+ log.info("using collector specified at agent runtime: " + argDestination);
+ } else if (destinations != null && destinations.hasNext()) {
+ collectors = destinations;
+ connectorClient.setCollectors(destinations);
+ log.info("using collectors from collectors file");
+ } else {
+ log.error("No collectors specified, exiting (and taking agent with us).");
+ agent.shutdown(true);// error is unrecoverable, so stop hard.
+ return;
+ }
+
+ try {
+ long lastPost = System.currentTimeMillis();
+ while (!stopMe) {
+ List<Chunk> newQueue = new ArrayList<Chunk>();
+ try {
+ // get all ready chunks from the chunkQueue to be sent
+ chunkQueue.collect(newQueue, MAX_SIZE_PER_POST); // FIXME: should
+ // really do this by
+ // size
+
+ } catch (InterruptedException e) {
+ System.out.println("thread interrupted during addChunks(ChunkQueue)");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ int toSend = newQueue.size();
+ List<ChukwaHttpSender.CommitListEntry> results = connectorClient
+ .send(newQueue);
+ log.info("sent " + toSend + " chunks, got back " + results.size()
+ + " acks");
+ // checkpoint the chunks which were committed
+ for (ChukwaHttpSender.CommitListEntry cle : results) {
+ agent.reportCommit(cle.adaptor, cle.uuid);
+ chunkCount++;
+ }
+
+ if (reloadConfiguration) {
+ connectorClient.setCollectors(collectors);
+ log.info("Resetting colectors");
+ reloadConfiguration = false;
+ }
+
+ long now = System.currentTimeMillis();
+ if (now - lastPost < MIN_POST_INTERVAL)
+ Thread.sleep(now - lastPost); // wait for stuff to accumulate
+ lastPost = now;
+ } // end of try forever loop
+ log
+ .info("received stop() command so exiting run() loop to shutdown connector");
+ } catch (OutOfMemoryError e) {
+ log.warn("Bailing out", e);
+ System.exit(-1);
+ } catch (InterruptedException e) {
+ // do nothing, let thread die.
+ log.warn("Bailing out", e);
+ System.exit(-1);
+ } catch (java.io.IOException e) {
+ log.error("connector failed; shutting down agent");
+ agent.shutdown(true);
+ }
+ }
+
+ @Override
+ public void reloadConfiguration() {
+ reloadConfiguration = true;
+ Iterator<String> destinations = null;
+
+ // build a list of our destinations from collectors
+ try {
+ destinations = DataFactory.getInstance().getCollectorURLs();
+ } catch (IOException e) {
+ log.error(
+ "Failed to retreive list of collectors from conf/collectors file", e);
+ }
+ if (destinations != null && destinations.hasNext()) {
+ collectors = destinations;
+ }
+
}
-
- public HttpConnector(ChukwaAgent agent) {
- this.agent = agent;
- }
-
- public HttpConnector(ChukwaAgent agent, String destination) {
- this.agent = agent;
- this.argDestination = destination;
-
- log.info("Setting HTTP Connector URL manually using arg passed to Agent: " + destination);
- }
-
- public void start() {
- (new Thread(this, "HTTP post thread")).start();
- }
-
- public void shutdown(){
- stopMe = true;
- }
-
- public void run(){
- log.info("HttpConnector started at time:" + System.currentTimeMillis());
-
- Iterator<String> destinations = null;
-
- // build a list of our destinations from collectors
- try{
- destinations = DataFactory.getInstance().getCollectorURLs();
- } catch (IOException e){
- log.error("Failed to retreive list of collectors from " +
- "conf/collectors file", e);
- }
-
- connectorClient = new ChukwaHttpSender(agent.getConfiguration());
-
- if (argDestination != null)
- {
- ArrayList<String> tmp = new ArrayList<String>();
- tmp.add(argDestination);
- collectors = tmp.iterator();
- connectorClient.setCollectors(collectors);
- log.info("using collector specified at agent runtime: " + argDestination);
- }
- else if (destinations != null && destinations.hasNext())
- {
- collectors = destinations;
- connectorClient.setCollectors(destinations);
- log.info("using collectors from collectors file");
- }
- else {
- log.error("No collectors specified, exiting (and taking agent with us).");
- agent.shutdown(true);//error is unrecoverable, so stop hard.
- return;
- }
-
- try {
- long lastPost = System.currentTimeMillis();
- while(!stopMe) {
- List<Chunk> newQueue = new ArrayList<Chunk>();
- try {
- //get all ready chunks from the chunkQueue to be sent
- chunkQueue.collect(newQueue,MAX_SIZE_PER_POST); //FIXME: should really do this by size
-
- } catch(InterruptedException e) {
- System.out.println("thread interrupted during addChunks(ChunkQueue)");
- Thread.currentThread().interrupt();
- break;
- }
- int toSend = newQueue.size();
- List<ChukwaHttpSender.CommitListEntry> results = connectorClient.send(newQueue);
- log.info("sent " +toSend + " chunks, got back " + results.size() + " acks");
- //checkpoint the chunks which were committed
- for(ChukwaHttpSender.CommitListEntry cle : results) {
- agent.reportCommit(cle.adaptor, cle.uuid);
- chunkCount++;
- }
-
- if (reloadConfiguration)
- {
- connectorClient.setCollectors(collectors);
- log.info("Resetting colectors");
- reloadConfiguration = false;
- }
-
- long now = System.currentTimeMillis();
- if( now - lastPost < MIN_POST_INTERVAL )
- Thread.sleep(now - lastPost); //wait for stuff to accumulate
- lastPost = now;
- } //end of try forever loop
- log.info("received stop() command so exiting run() loop to shutdown connector");
- } catch(OutOfMemoryError e) {
- log.warn("Bailing out",e);
- System.exit(-1);
- } catch(InterruptedException e) {
- //do nothing, let thread die.
- log.warn("Bailing out",e);
- System.exit(-1);
- }catch(java.io.IOException e) {
- log.error("connector failed; shutting down agent");
- agent.shutdown(true);
- }
- }
-
- @Override
- public void reloadConfiguration()
- {
- reloadConfiguration = true;
- Iterator<String> destinations = null;
-
- // build a list of our destinations from collectors
- try{
- destinations = DataFactory.getInstance().getCollectorURLs();
- } catch (IOException e){
- log.error("Failed to retreive list of collectors from conf/collectors file", e);
- }
- if (destinations != null && destinations.hasNext())
- {
- collectors = destinations;
- }
-
- }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/controller/ChukwaAgentController.java Wed Mar 11 22:39:26 2009
@@ -34,29 +34,29 @@
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.log4j.Logger;
/**
- * A convenience library for applications to communicate to the {@link ChukwaAgent}. Can be used
- * to register and unregister new {@link Adaptor}s. Also contains functions for applications to
- * use for handling log rations.
+ * A convenience library for applications to communicate to the
+ * {@link ChukwaAgent}. Can be used to register and unregister new
+ * {@link Adaptor}s. Also contains functions for applications to use for
+ * handling log rations.
*/
public class ChukwaAgentController {
- static Logger log = Logger.getLogger(ChukwaAgentController.class);
- public class AddAdaptorTask extends TimerTask
- {
-
- String adaptorName;
- String type;
- String params;
- long offset;
- long numRetries;
- long retryInterval;
-
- AddAdaptorTask(String adaptorName, String type, String params,
- long offset, long numRetries, long retryInterval){
+ static Logger log = Logger.getLogger(ChukwaAgentController.class);
+
+ public class AddAdaptorTask extends TimerTask {
+
+ String adaptorName;
+ String type;
+ String params;
+ long offset;
+ long numRetries;
+ long retryInterval;
+
+ AddAdaptorTask(String adaptorName, String type, String params, long offset,
+ long numRetries, long retryInterval) {
this.adaptorName = adaptorName;
this.type = type;
this.params = params;
@@ -64,354 +64,398 @@
this.numRetries = numRetries;
this.retryInterval = retryInterval;
}
+
@Override
- public void run()
- {
- try
- {
- log.info("Trying to resend the add command [" + adaptorName + "][" + offset + "][" + params +"] [" + numRetries+"]");
- add(adaptorName, type, params, offset, numRetries, retryInterval);
- }
- catch(Exception e)
- {
- log.warn("Exception in AddAdaptorTask.run", e);
- e.printStackTrace();
- }
+ public void run() {
+ try {
+ log.info("Trying to resend the add command [" + adaptorName + "]["
+ + offset + "][" + params + "] [" + numRetries + "]");
+ add(adaptorName, type, params, offset, numRetries, retryInterval);
+ } catch (Exception e) {
+ log.warn("Exception in AddAdaptorTask.run", e);
+ e.printStackTrace();
+ }
}
}
- //our default adaptors, provided here for convenience
+ // our default adaptors, provided here for convenience
public static final String CharFileTailUTF8 = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8";
public static final String CharFileTailUTF8NewLineEscaped = "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped";
-
-
+
static String DEFAULT_FILE_TAILER = CharFileTailUTF8NewLineEscaped;
static int DEFAULT_PORT = 9093;
static String DEFAULT_HOST = "localhost";
static int numArgs = 0;
-
- class Adaptor{
- public long id = -1;
+
+ class Adaptor {
+ public long id = -1;
final public String name;
final public String params;
final public String appType;
- public long offset;
+ public long offset;
-
- Adaptor(String adaptorName, String appType, String params, long offset){
+ Adaptor(String adaptorName, String appType, String params, long offset) {
this.name = adaptorName;
this.appType = appType;
this.params = params;
this.offset = offset;
}
-
- Adaptor(long id, String adaptorName, String appType, String params, long offset){
+
+ Adaptor(long id, String adaptorName, String appType, String params,
+ long offset) {
this.id = id;
this.name = adaptorName;
this.appType = appType;
this.params = params;
this.offset = offset;
}
-
+
/**
- * Registers this {@link Adaptor} with the agent running at the specified hostname and portno
- * @return The id number of the this {@link Adaptor}, assigned by the agent upon successful registration
+ * Registers this {@link Adaptor} with the agent running at the specified
+ * hostname and portno
+ *
+ * @return The id number of the this {@link Adaptor}, assigned by the agent
+ * upon successful registration
* @throws IOException
*/
- long register() throws IOException{
+ long register() throws IOException {
Socket s = new Socket(hostname, portno);
- PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+ PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
+ .getOutputStream()));
bw.println("ADD " + name + " " + appType + " " + params + " " + offset);
bw.flush();
- BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+ BufferedReader br = new BufferedReader(new InputStreamReader(s
+ .getInputStream()));
String resp = br.readLine();
- if(resp != null){
+ if (resp != null) {
String[] fields = resp.split(" ");
- if(fields[0].equals("OK")){
- try{
- id = Long.parseLong(fields[fields.length -1]);
+ if (fields[0].equals("OK")) {
+ try {
+ id = Long.parseLong(fields[fields.length - 1]);
+ } catch (NumberFormatException e) {
}
- catch (NumberFormatException e){}
}
}
s.close();
return id;
}
-
- void unregister() throws IOException{
+
+ void unregister() throws IOException {
Socket s = new Socket(hostname, portno);
- PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
+ PrintWriter bw = new PrintWriter(new OutputStreamWriter(s
+ .getOutputStream()));
bw.println("SHUTDOWN " + id);
bw.flush();
- BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
+ BufferedReader br = new BufferedReader(new InputStreamReader(s
+ .getInputStream()));
String resp = br.readLine();
- if( resp == null || !resp.startsWith("OK"))
- {
- //error. What do we do?
- } else if (resp.startsWith("OK")){
+ if (resp == null || !resp.startsWith("OK")) {
+ // error. What do we do?
+ } else if (resp.startsWith("OK")) {
String[] respSplit = resp.split(" ");
- String newOffset = respSplit[respSplit.length-1];
+ String newOffset = respSplit[respSplit.length - 1];
try {
offset = Long.parseLong(newOffset);
- }catch (NumberFormatException nfe){
+ } catch (NumberFormatException nfe) {
System.err.println("adaptor didn't shutdown gracefully.\n" + nfe);
}
}
-
+
s.close();
}
-
- public String toString(){
+
+ public String toString() {
String[] namePieces = name.split("\\.");
- String shortName = namePieces[namePieces.length-1];
- return id + " " + shortName + " " + appType + " " + params + " " + offset;
+ String shortName = namePieces[namePieces.length - 1];
+ return id + " " + shortName + " " + appType + " " + params + " " + offset;
}
}
-
- Map<Long, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<Long,Adaptor>();
+
+ Map<Long, ChukwaAgentController.Adaptor> runningAdaptors = new HashMap<Long, Adaptor>();
Map<Long, ChukwaAgentController.Adaptor> pausedAdaptors;
String hostname;
int portno;
-
-
- public ChukwaAgentController(){
+
+ public ChukwaAgentController() {
portno = DEFAULT_PORT;
hostname = DEFAULT_HOST;
- pausedAdaptors = new HashMap<Long,Adaptor>();
-
+ pausedAdaptors = new HashMap<Long, Adaptor>();
+
syncWithAgent();
}
-
- public ChukwaAgentController(String hostname, int portno)
- {
+
+ public ChukwaAgentController(String hostname, int portno) {
this.hostname = hostname;
this.portno = portno;
- pausedAdaptors = new HashMap<Long,Adaptor>();
+ pausedAdaptors = new HashMap<Long, Adaptor>();
syncWithAgent();
}
private boolean syncWithAgent() {
- //set up adaptors by using list here
- try{
+ // set up adaptors by using list here
+ try {
runningAdaptors = list();
return true;
- }catch(IOException e){
- System.err.println("Error initializing ChukwaClient with list of " +
- "currently registered adaptors, clearing our local list of adaptors");
- //e.printStackTrace();
- //if we can't connect to the LocalAgent, reset/clear our local view of the Adaptors.
- runningAdaptors = new HashMap<Long,ChukwaAgentController.Adaptor>();
+ } catch (IOException e) {
+ System.err
+ .println("Error initializing ChukwaClient with list of "
+ + "currently registered adaptors, clearing our local list of adaptors");
+ // e.printStackTrace();
+ // if we can't connect to the LocalAgent, reset/clear our local view of
+ // the Adaptors.
+ runningAdaptors = new HashMap<Long, ChukwaAgentController.Adaptor>();
return false;
}
}
-
+
/**
- * Registers a new adaptor. Makes no guarantee about success. On failure,
- * we print a message to stderr and ignore silently so that an application
+ * Registers a new adaptor. Makes no guarantee about success. On failure, we
+ * print a message to stderr and ignore silently so that an application
* doesn't crash if it's attempt to register an adaptor fails. This call does
* not retry a conection. for that use the overloaded version of this which
* accepts a time interval and number of retries
+ *
+ * @return the id number of the adaptor, generated by the agent
+ */
+ public long add(String adaptorName, String type, String params, long offset) {
+ return add(adaptorName, type, params, offset, 20, 15 * 1000);// retry for
+ // five
+ // minutes,
+ // every
+ // fifteen
+ // seconds
+ }
+
+ /**
+ * Registers a new adaptor. Makes no guarantee about success. On failure, to
+ * connect to server, will retry <code>numRetries</code> times, every
+ * <code>retryInterval</code> milliseconds.
+ *
+ * @return the id number of the adaptor, generated by the agent
+ */
+ public long add(String adaptorName, String type, String params, long offset,
+ long numRetries, long retryInterval) {
+ ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(
+ adaptorName, type, params, offset);
+ long adaptorID = -1;
+ if (numRetries >= 0) {
+ try {
+ adaptorID = adaptor.register();
+
+ if (adaptorID > 0) {
+ runningAdaptors.put(adaptorID, adaptor);
+ } else {
+ System.err
+ .println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
+ }
+ } catch (IOException ioe) {
+ System.out.println("AgentClient failed to contact the agent ("
+ + hostname + ":" + portno + ")");
+ System.out
+ .println("Scheduling a agent connection retry for adaptor add() in another "
+ + retryInterval
+ + " milliseconds, "
+ + numRetries
+ + " retries remaining");
+
+ Timer addFileTimer = new Timer();
+ addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params,
+ offset, numRetries - 1, retryInterval), retryInterval);
+ }
+ } else {
+ System.err.println("Giving up on connecting to the local agent");
+ }
+ return adaptorID;
+ }
+
+ public synchronized ChukwaAgentController.Adaptor remove(long adaptorID)
+ throws IOException {
+ syncWithAgent();
+ ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
+ a.unregister();
+ return a;
+
+ }
+
+ public void remove(String className, String appType, String filename)
+ throws IOException {
+ syncWithAgent();
+ // search for FileTail adaptor with string of this file name
+ // get its id, tell it to unregister itself with the agent,
+ // then remove it from the list of adaptors
+ for (Adaptor a : runningAdaptors.values()) {
+ if (a.name.equals(className) && a.params.equals(filename)
+ && a.appType.equals(appType)) {
+ remove(a.id);
+ }
+ }
+ }
+
+ public void removeAll() {
+ syncWithAgent();
+ Long[] keyset = runningAdaptors.keySet().toArray(new Long[] {});
+
+ for (long id : keyset) {
+ try {
+ remove(id);
+ } catch (IOException ioe) {
+ System.err.println("Error removing an adaptor in removeAll()");
+ ioe.printStackTrace();
+ }
+ System.out.println("Successfully removed adaptor " + id);
+ }
+ }
+
+ Map<Long, ChukwaAgentController.Adaptor> list() throws IOException {
+ Socket s = new Socket(hostname, portno);
+ PrintWriter bw = new PrintWriter(
+ new OutputStreamWriter(s.getOutputStream()));
+
+ bw.println("LIST");
+ bw.flush();
+ BufferedReader br = new BufferedReader(new InputStreamReader(s
+ .getInputStream()));
+ String ln;
+ Map<Long, Adaptor> listResult = new HashMap<Long, Adaptor>();
+ while ((ln = br.readLine()) != null) {
+ if (ln.equals("")) {
+ break;
+ } else {
+ String[] parts = ln.split("\\s+");
+ if (parts.length >= 4) { // should have id, className appType, params,
+ // offset
+ long id = Long
+ .parseLong(parts[0].substring(0, parts[0].length() - 1)); // chop
+ // off
+ // the
+ // right
+ // -
+ // paren
+ long offset = Long.parseLong(parts[parts.length - 1]);
+ String tmpParams = parts[3];
+ for (int i = 4; i < parts.length - 1; i++) {
+ tmpParams += " " + parts[i];
+ }
+ listResult.put(id, new Adaptor(id, parts[1], parts[2], tmpParams,
+ offset));
+ }
+ }
+ }
+ s.close();
+ return listResult;
+ }
+
+ // ************************************************************************
+ // The following functions are convenience functions, defining an easy
+ // to use API for application developers to integrate chukwa into their app
+ // ************************************************************************
+
+ /**
+ * Registers a new "LineFileTailUTF8" adaptor and starts it at offset 0.
+ * Checks to see if the file is being watched already, if so, won't register
+ * another adaptor with the agent. If you have run the tail adaptor on this
+ * file before and rotated or emptied the file you should use
+ * {@link ChukwaAgentController#pauseFile(String, String)} and
+ * {@link ChukwaAgentController#resumeFile(String, String)} which will store
+ * the adaptors metadata and re-use them to pick up where it left off.
+ *
+ * @param type the datatype associated with the file to pass through
+ * @param filename of the file for the tail adaptor to start monitoring
* @return the id number of the adaptor, generated by the agent
*/
- public long add(String adaptorName, String type, String params, long offset){
- return add(adaptorName, type, params, offset, 20, 15* 1000);//retry for five minutes, every fifteen seconds
- }
-
- /**
- * Registers a new adaptor. Makes no guarantee about success. On failure,
- * to connect to server, will retry <code>numRetries</code> times, every
- * <code>retryInterval</code> milliseconds.
- * @return the id number of the adaptor, generated by the agent
- */
- public long add(String adaptorName, String type, String params, long offset, long numRetries, long retryInterval){
- ChukwaAgentController.Adaptor adaptor = new ChukwaAgentController.Adaptor(adaptorName, type, params, offset);
- long adaptorID = -1;
- if (numRetries >= 0){
- try{
- adaptorID = adaptor.register();
-
- if (adaptorID > 0){
- runningAdaptors.put(adaptorID,adaptor);
- }
- else{
- System.err.println("Failed to successfully add the adaptor in AgentClient, adaptorID returned by add() was negative.");
- }
- }catch(IOException ioe){
- System.out.println("AgentClient failed to contact the agent (" + hostname + ":" + portno + ")");
- System.out.println("Scheduling a agent connection retry for adaptor add() in another " +
- retryInterval + " milliseconds, " + numRetries + " retries remaining");
-
- Timer addFileTimer = new Timer();
- addFileTimer.schedule(new AddAdaptorTask(adaptorName, type, params, offset, numRetries-1, retryInterval), retryInterval);
- }
- }else{
- System.err.println("Giving up on connecting to the local agent");
- }
- return adaptorID;
- }
-
- public synchronized ChukwaAgentController.Adaptor remove(long adaptorID) throws IOException
- {
- syncWithAgent();
- ChukwaAgentController.Adaptor a = runningAdaptors.remove(adaptorID);
- a.unregister();
- return a;
-
- }
-
- public void remove(String className, String appType, String filename) throws IOException
- {
- syncWithAgent();
- // search for FileTail adaptor with string of this file name
- // get its id, tell it to unregister itself with the agent,
- // then remove it from the list of adaptors
- for (Adaptor a : runningAdaptors.values()){
- if (a.name.equals(className) && a.params.equals(filename) && a.appType.equals(appType)){
- remove(a.id);
- }
- }
- }
-
-
- public void removeAll(){
- syncWithAgent();
- Long[] keyset = runningAdaptors.keySet().toArray(new Long[]{});
-
- for (long id : keyset){
- try {
- remove(id);
- }catch(IOException ioe){
- System.err.println("Error removing an adaptor in removeAll()");
- ioe.printStackTrace();
- }
- System.out.println("Successfully removed adaptor " + id);
- }
- }
-
- Map<Long,ChukwaAgentController.Adaptor> list() throws IOException
- {
- Socket s = new Socket(hostname, portno);
- PrintWriter bw = new PrintWriter(new OutputStreamWriter(s.getOutputStream()));
-
- bw.println("LIST");
- bw.flush();
- BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream()));
- String ln;
- Map<Long,Adaptor> listResult = new HashMap<Long,Adaptor>();
- while((ln = br.readLine())!= null)
- {
- if (ln.equals("")){
- break;
- }else{
- String[] parts = ln.split("\\s+");
- if (parts.length >= 4){ //should have id, className appType, params, offset
- long id = Long.parseLong(parts[0].substring(0,parts[0].length()-1)); //chop off the right-paren
- long offset = Long.parseLong(parts[parts.length-1]);
- String tmpParams = parts[3];
- for (int i = 4; i<parts.length-1; i++){
- tmpParams += " " + parts[i];
- }
- listResult.put(id, new Adaptor(id,parts[1],parts[2],tmpParams, offset));
- }
- }
- }
- s.close();
- return listResult;
- }
-
- //************************************************************************
- // The following functions are convenience functions, defining an easy
- // to use API for application developers to integrate chukwa into their app
- //************************************************************************
-
- /**
- * Registers a new "LineFileTailUTF8" adaptor and starts it at offset 0. Checks to
- * see if the file is being watched already, if so, won't register another adaptor
- * with the agent. If you have run the tail adaptor on this file before and rotated
- * or emptied the file you should use {@link ChukwaAgentController#pauseFile(String, String)}
- * and {@link ChukwaAgentController#resumeFile(String, String)} which will store the adaptors
- * metadata and re-use them to pick up where it left off.
- * @param type the datatype associated with the file to pass through
- * @param filename of the file for the tail adaptor to start monitoring
- * @return the id number of the adaptor, generated by the agent
- */
- public long addFile(String appType, String filename, long numRetries, long retryInterval)
- {
+ public long addFile(String appType, String filename, long numRetries,
+ long retryInterval) {
filename = new File(filename).getAbsolutePath();
- //TODO: Mabye we want to check to see if the file exists here?
- // Probably not because they might be talking to an agent on a different machine?
-
- //check to see if this file is being watched already, if yes don't set up another adaptor for it
+ // TODO: Mabye we want to check to see if the file exists here?
+ // Probably not because they might be talking to an agent on a different
+ // machine?
+
+ // check to see if this file is being watched already, if yes don't set up
+ // another adaptor for it
boolean isDuplicate = false;
- for (Adaptor a : runningAdaptors.values()){
- if (a.name.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType) && a.params.endsWith(filename)){
+ for (Adaptor a : runningAdaptors.values()) {
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.appType.equals(appType)
+ && a.params.endsWith(filename)) {
isDuplicate = true;
}
}
- if (!isDuplicate){
- return add(DEFAULT_FILE_TAILER, appType, 0L + " " + filename,0L, numRetries, retryInterval);
- }
- else{
- System.out.println("An adaptor for filename \"" + filename + "\", type \""
- + appType + "\", exists already, addFile() command aborted");
+ if (!isDuplicate) {
+ return add(DEFAULT_FILE_TAILER, appType, 0L + " " + filename, 0L,
+ numRetries, retryInterval);
+ } else {
+ System.out.println("An adaptor for filename \"" + filename
+ + "\", type \"" + appType
+ + "\", exists already, addFile() command aborted");
return -1;
}
}
-
- public long addFile(String appType, String filename){
+
+ public long addFile(String appType, String filename) {
return addFile(appType, filename, 0, 0);
}
-
+
/**
- * Pause all active adaptors of the default file tailing type who are tailing this file
- * This means we actually stop the adaptor and it goes away forever, but we store it
- * state so that we can re-launch a new adaptor with the same state later.
+ * Pause all active adaptors of the default file tailing type who are tailing
+ * this file This means we actually stop the adaptor and it goes away forever,
+ * but we store it state so that we can re-launch a new adaptor with the same
+ * state later.
+ *
* @param appType
* @param filename
- * @return array of adaptorID numbers which have been created and assigned the state of the formerly paused adaptors
+ * @return array of adaptorID numbers which have been created and assigned the
+ * state of the formerly paused adaptors
* @throws IOException
*/
- public Collection<Long> pauseFile(String appType, String filename) throws IOException{
+ public Collection<Long> pauseFile(String appType, String filename)
+ throws IOException {
syncWithAgent();
- //store the unique streamid of the file we are pausing.
- //search the list of adaptors for this filename
- //store the current offset for it
+ // store the unique streamid of the file we are pausing.
+ // search the list of adaptors for this filename
+ // store the current offset for it
List<Long> results = new ArrayList<Long>();
- for (Adaptor a : runningAdaptors.values()){
- if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
- pausedAdaptors.put(a.id,a); //add it to our list of paused adaptors
- remove(a.id); //tell the agent to remove/unregister it
+ for (Adaptor a : runningAdaptors.values()) {
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
+ && a.appType.equals(appType)) {
+ pausedAdaptors.put(a.id, a); // add it to our list of paused adaptors
+ remove(a.id); // tell the agent to remove/unregister it
results.add(a.id);
}
}
return results;
}
-
- public boolean isFilePaused(String appType, String filename){
- for (Adaptor a : pausedAdaptors.values()){
- if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+
+ public boolean isFilePaused(String appType, String filename) {
+ for (Adaptor a : pausedAdaptors.values()) {
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
+ && a.appType.equals(appType)) {
return true;
}
}
return false;
}
-
+
/**
* Resume all adaptors for this filename that have been paused
+ *
* @param appType the appType
- * @param filename filename by which to lookup adaptors which are paused (and tailing this file)
- * @return an array of the new adaptor ID numbers which have resumed where the old adaptors left off
+ * @param filename filename by which to lookup adaptors which are paused (and
+ * tailing this file)
+ * @return an array of the new adaptor ID numbers which have resumed where the
+ * old adaptors left off
* @throws IOException
*/
- public Collection<Long> resumeFile(String appType, String filename) throws IOException{
+ public Collection<Long> resumeFile(String appType, String filename)
+ throws IOException {
syncWithAgent();
- //search for a record of this paused file
+ // search for a record of this paused file
List<Long> results = new ArrayList<Long>();
- for (Adaptor a : pausedAdaptors.values()){
- if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
- long newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " " + filename, a.offset);
+ for (Adaptor a : pausedAdaptors.values()) {
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
+ && a.appType.equals(appType)) {
+ long newID = add(DEFAULT_FILE_TAILER, a.appType, a.offset + " "
+ + filename, a.offset);
pausedAdaptors.remove(a.id);
a.id = newID;
results.add(a.id);
@@ -419,104 +463,102 @@
}
return results;
}
-
-
- public void removeFile(String appType, String filename) throws IOException
- {
+
+ public void removeFile(String appType, String filename) throws IOException {
syncWithAgent();
// search for FileTail adaptor with string of this file name
// get its id, tell it to unregister itself with the agent,
// then remove it from the list of adaptors
- for (Adaptor a : runningAdaptors.values()){
- if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename) && a.appType.equals(appType)){
+ for (Adaptor a : runningAdaptors.values()) {
+ if (a.name.equals(DEFAULT_FILE_TAILER) && a.params.endsWith(filename)
+ && a.appType.equals(appType)) {
remove(a.id);
}
}
}
-
- //************************************************************************
+
+ // ************************************************************************
// command line utilities
- //************************************************************************
-
- public static void main(String[] args)
- {
+ // ************************************************************************
+
+ public static void main(String[] args) {
ChukwaAgentController c = getClient(args);
- if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")){
+ if (numArgs >= 3 && args[0].toLowerCase().equals("addfile")) {
doAddFile(c, args[1], args[2]);
- }
- else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")){
+ } else if (numArgs >= 3 && args[0].toLowerCase().equals("removefile")) {
doRemoveFile(c, args[1], args[2]);
- }
- else if(numArgs >= 1 && args[0].toLowerCase().equals("list")){
+ } else if (numArgs >= 1 && args[0].toLowerCase().equals("list")) {
doList(c);
- }
- else if(numArgs >= 1 && args[0].equalsIgnoreCase("removeall")){
+ } else if (numArgs >= 1 && args[0].equalsIgnoreCase("removeall")) {
doRemoveAll(c);
- }
- else{
- System.err.println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
- System.err.println(" ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
- System.err.println(" ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
+ } else {
+ System.err
+ .println("usage: ChukwaClient addfile <apptype> <filename> [-h hostname] [-p portnumber]");
+ System.err
+ .println(" ChukwaClient removefile adaptorID [-h hostname] [-p portnumber]");
+ System.err
+ .println(" ChukwaClient removefile <apptype> <filename> [-h hostname] [-p portnumber]");
System.err.println(" ChukwaClient list [IP] [port]");
System.err.println(" ChukwaClient removeAll [IP] [port]");
}
}
-
- private static ChukwaAgentController getClient(String[] args){
+
+ private static ChukwaAgentController getClient(String[] args) {
int portno = 9093;
String hostname = "localhost";
numArgs = args.length;
-
- for (int i = 0; i < args.length; i++){
- if(args[i].equals("-h") && args.length > i + 1){
- hostname = args[i+1];
- System.out.println ("Setting hostname to: " + hostname);
- numArgs -= 2; //subtract for the flag and value
- }
- else if (args[i].equals("-p") && args.length > i + 1){
- portno = Integer.parseInt(args[i+1]);
- System.out.println ("Setting portno to: " + portno);
- numArgs -= 2; //subtract for the flat, i.e. -p, and value
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-h") && args.length > i + 1) {
+ hostname = args[i + 1];
+ System.out.println("Setting hostname to: " + hostname);
+ numArgs -= 2; // subtract for the flag and value
+ } else if (args[i].equals("-p") && args.length > i + 1) {
+ portno = Integer.parseInt(args[i + 1]);
+ System.out.println("Setting portno to: " + portno);
+ numArgs -= 2; // subtract for the flat, i.e. -p, and value
}
}
return new ChukwaAgentController(hostname, portno);
}
-
- private static long doAddFile(ChukwaAgentController c, String appType, String params){
+
+ private static long doAddFile(ChukwaAgentController c, String appType,
+ String params) {
System.out.println("Adding adaptor with filename: " + params);
long adaptorID = c.addFile(appType, params);
- if (adaptorID != -1){
+ if (adaptorID != -1) {
System.out.println("Successfully added adaptor, id is:" + adaptorID);
- }else{
- System.err.println("Agent reported failure to add adaptor, adaptor id returned was:" + adaptorID);
+ } else {
+ System.err
+ .println("Agent reported failure to add adaptor, adaptor id returned was:"
+ + adaptorID);
}
return adaptorID;
}
-
- private static void doRemoveFile(ChukwaAgentController c, String appType, String params){
- try{
+
+ private static void doRemoveFile(ChukwaAgentController c, String appType,
+ String params) {
+ try {
System.out.println("Removing adaptor with filename: " + params);
- c.removeFile(appType,params);
- }
- catch(IOException e)
- {
+ c.removeFile(appType, params);
+ } catch (IOException e) {
e.printStackTrace();
}
}
-
- private static void doList(ChukwaAgentController c){
- try{
+
+ private static void doList(ChukwaAgentController c) {
+ try {
Iterator<Adaptor> adptrs = c.list().values().iterator();
- while (adptrs.hasNext()){
+ while (adptrs.hasNext()) {
System.out.println(adptrs.next().toString());
}
- } catch(Exception e){
+ } catch (Exception e) {
e.printStackTrace();
}
}
-
- private static void doRemoveAll(ChukwaAgentController c){
+
+ private static void doRemoveAll(ChukwaAgentController c) {
System.out.println("Removing all adaptors");
c.removeAll();
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/protocol/Protocol.java Wed Mar 11 22:39:26 2009
@@ -17,17 +17,18 @@
*/
package org.apache.hadoop.chukwa.datacollection.protocol;
+
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
-
import org.apache.hadoop.chukwa.Chunk;
-public interface Protocol
-{
- public byte[] toByteArray(List<Chunk> chunks);
- public List<Chunk> parseFrom(byte[] bytes);
-
- void writeTo(OutputStream output);
- List<Chunk> parseFrom(InputStream input);
+public interface Protocol {
+ public byte[] toByteArray(List<Chunk> chunks);
+
+ public List<Chunk> parseFrom(byte[] bytes);
+
+ void writeTo(OutputStream output);
+
+ List<Chunk> parseFrom(InputStream input);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.sender;
+
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
@@ -28,7 +29,6 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpMethod;
@@ -45,56 +45,56 @@
import org.apache.log4j.Logger;
/**
- * Encapsulates all of the http setup and connection details needed for
- * chunks to be delivered to a collector.
+ * Encapsulates all of the http setup and connection details needed for chunks
+ * to be delivered to a collector.
+ * <p>
+ * On error, tries the list of available collectors, pauses for a minute, and
+ * then repeats.
+ * </p>
* <p>
- * On error, tries the list of available collectors, pauses for a minute, and then repeats.
+ * Will wait forever for collectors to come up.
* </p>
- * <p> Will wait forever for collectors to come up. </p>
*/
-public class ChukwaHttpSender implements ChukwaSender{
- final int MAX_RETRIES_PER_COLLECTOR ; //fast retries, in http client
- final int SENDER_RETRIES;
- final int WAIT_FOR_COLLECTOR_REBOOT;
- //FIXME: this should really correspond to the timer in RetryListOfCollectors
-
+public class ChukwaHttpSender implements ChukwaSender {
+ final int MAX_RETRIES_PER_COLLECTOR; // fast retries, in http client
+ final int SENDER_RETRIES;
+ final int WAIT_FOR_COLLECTOR_REBOOT;
+ // FIXME: this should really correspond to the timer in RetryListOfCollectors
+
static Logger log = Logger.getLogger(ChukwaHttpSender.class);
static HttpClient client = null;
static MultiThreadedHttpConnectionManager connectionManager = null;
static String currCollector = null;
-
protected Iterator<String> collectors;
-
- static
- {
- connectionManager =
- new MultiThreadedHttpConnectionManager();
+
+ static {
+ connectionManager = new MultiThreadedHttpConnectionManager();
client = new HttpClient(connectionManager);
connectionManager.closeIdleConnections(1000);
}
-
+
public static class CommitListEntry {
public Adaptor adaptor;
public long uuid;
-
- public CommitListEntry(Adaptor a, long uuid) {
+
+ public CommitListEntry(Adaptor a, long uuid) {
adaptor = a;
this.uuid = uuid;
}
}
-
-//FIXME: probably we're better off with an EventListRequestEntity
+
+ // FIXME: probably we're better off with an EventListRequestEntity
static class BuffersRequestEntity implements RequestEntity {
List<DataOutputBuffer> buffers;
-
+
public BuffersRequestEntity(List<DataOutputBuffer> buf) {
- buffers=buf;
+ buffers = buf;
}
- public long getContentLength() {
- long len=4;//first we send post length, then buffers
- for(DataOutputBuffer b: buffers)
+ public long getContentLength() {
+ long len = 4;// first we send post length, then buffers
+ for (DataOutputBuffer b : buffers)
len += b.getLength();
return len;
}
@@ -103,167 +103,179 @@
return "application/octet-stream";
}
- public boolean isRepeatable() {
+ public boolean isRepeatable() {
return true;
}
- public void writeRequest(OutputStream out) throws IOException {
+ public void writeRequest(OutputStream out) throws IOException {
DataOutputStream dos = new DataOutputStream(out);
dos.writeInt(buffers.size());
- for(DataOutputBuffer b: buffers)
+ for (DataOutputBuffer b : buffers)
dos.write(b.getData(), 0, b.getLength());
}
}
- public ChukwaHttpSender(Configuration c){
- //setup default collector
+ public ChukwaHttpSender(Configuration c) {
+ // setup default collector
ArrayList<String> tmp = new ArrayList<String>();
this.collectors = tmp.iterator();
- log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: " + collectors.hasNext());
+ log
+ .info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: "
+ + collectors.hasNext());
MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
- SENDER_RETRIES= c.getInt("chukwaAgent.sender.retries", 144000);
- WAIT_FOR_COLLECTOR_REBOOT= c.getInt("chukwaAgent.sender.retryInterval", 20*1000);
+ SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
+ WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
+ 20 * 1000);
}
-
+
/**
* Set up a single connector for this client to send {@link Chunk}s to
+ *
* @param collector the url of the collector
*/
- public void setCollectors(String collector){
- }
-
+ public void setCollectors(String collector) {
+ }
+
/**
* Set up a list of connectors for this client to send {@link Chunk}s to
+ *
* @param collectors
*/
- public void setCollectors(Iterator<String> collectors){
- this.collectors = collectors;
- //setup a new destination from our list of collectors if one hasn't been set up
- if (currCollector == null){
- if (collectors.hasNext()){
+ public void setCollectors(Iterator<String> collectors) {
+ this.collectors = collectors;
+ // setup a new destination from our list of collectors if one hasn't been
+ // set up
+ if (currCollector == null) {
+ if (collectors.hasNext()) {
currCollector = collectors.next();
- }
- else
- log.error("No collectors to try in send(), not even trying to do doPost()");
+ } else
+ log
+ .error("No collectors to try in send(), not even trying to do doPost()");
}
}
-
-
+
/**
- * grab all of the chunks currently in the chunkQueue, stores a copy of them locally, calculates
- * their size, sets them up
+ * grab all of the chunks currently in the chunkQueue, stores a copy of them
+ * locally, calculates their size, sets them up
+ *
* @return array of chunk id's which were ACKed by collector
*/
- public List<CommitListEntry> send(List<Chunk> toSend) throws InterruptedException, IOException{
+ public List<CommitListEntry> send(List<Chunk> toSend)
+ throws InterruptedException, IOException {
List<DataOutputBuffer> serializedEvents = new ArrayList<DataOutputBuffer>();
List<CommitListEntry> commitResults = new ArrayList<CommitListEntry>();
-
+
log.info("collected " + toSend.size() + " chunks");
- //Serialize each chunk in turn into it's own DataOutputBuffer and add that buffer to serializedEvents
- for(Chunk c: toSend) {
+ // Serialize each chunk in turn into it's own DataOutputBuffer and add that
+ // buffer to serializedEvents
+ for (Chunk c : toSend) {
DataOutputBuffer b = new DataOutputBuffer(c.getSerializedSizeEstimate());
try {
c.write(b);
- }catch(IOException err) {
+ } catch (IOException err) {
log.error("serialization threw IOException", err);
}
serializedEvents.add(b);
- //store a CLE for this chunk which we will use to ack this chunk to the caller of send()
- //(e.g. the agent will use the list of CLE's for checkpointing)
+ // store a CLE for this chunk which we will use to ack this chunk to the
+ // caller of send()
+ // (e.g. the agent will use the list of CLE's for checkpointing)
commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID()));
}
toSend.clear();
-
- //collect all serialized chunks into a single buffer to send
- RequestEntity postData = new BuffersRequestEntity(serializedEvents);
+ // collect all serialized chunks into a single buffer to send
+ RequestEntity postData = new BuffersRequestEntity(serializedEvents);
- int retries = SENDER_RETRIES;
- while(currCollector != null)
- {
- //need to pick a destination here
+ int retries = SENDER_RETRIES;
+ while (currCollector != null) {
+ // need to pick a destination here
PostMethod method = new PostMethod();
- try {
+ try {
doPost(method, postData, currCollector);
- retries = SENDER_RETRIES; //reset count on success
- //if no exception was thrown from doPost, ACK that these chunks were sent
+ retries = SENDER_RETRIES; // reset count on success
+ // if no exception was thrown from doPost, ACK that these chunks were
+ // sent
return commitResults;
} catch (Throwable e) {
log.error("Http post exception", e);
- log.info("Checking list of collectors to see if another collector has been specified for rollover");
- if (collectors.hasNext()){
+ log
+ .info("Checking list of collectors to see if another collector has been specified for rollover");
+ if (collectors.hasNext()) {
currCollector = collectors.next();
- log.info("Found a new collector to roll over to, retrying HTTP Post to collector " + currCollector);
+ log
+ .info("Found a new collector to roll over to, retrying HTTP Post to collector "
+ + currCollector);
} else {
- if(retries > 0) {
- log.warn("No more collectors to try rolling over to; waiting " + WAIT_FOR_COLLECTOR_REBOOT +
- " ms (" + retries + "retries left)");
+ if (retries > 0) {
+ log.warn("No more collectors to try rolling over to; waiting "
+ + WAIT_FOR_COLLECTOR_REBOOT + " ms (" + retries
+ + "retries left)");
Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
- retries --;
+ retries--;
} else {
log.error("No more collectors to try rolling over to; aborting");
throw new IOException("no collectors");
}
}
- }
- finally {
+ } finally {
// be sure the connection is released back to the connection manager
method.releaseConnection();
}
- } //end retry loop
+ } // end retry loop
return new ArrayList<CommitListEntry>();
}
-
+
/**
* Handles the HTTP post. Throws HttpException on failure
*/
@SuppressWarnings("deprecation")
private void doPost(PostMethod method, RequestEntity data, String dest)
- throws IOException, HttpException
- {
-
+ throws IOException, HttpException {
+
HttpMethodParams pars = method.getParams();
- pars.setParameter (HttpMethodParams.RETRY_HANDLER, (Object) new HttpMethodRetryHandler()
- {
- public boolean retryMethod(HttpMethod m, IOException e, int exec)
- {
- return !(e instanceof java.net.ConnectException) && (exec < MAX_RETRIES_PER_COLLECTOR);
- }
- });
-
- pars.setParameter(HttpMethodParams.SO_TIMEOUT , new Integer(30000));
-
-
-
+ pars.setParameter(HttpMethodParams.RETRY_HANDLER,
+ (Object) new HttpMethodRetryHandler() {
+ public boolean retryMethod(HttpMethod m, IOException e, int exec) {
+ return !(e instanceof java.net.ConnectException)
+ && (exec < MAX_RETRIES_PER_COLLECTOR);
+ }
+ });
+
+ pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(30000));
+
method.setParams(pars);
method.setPath(dest);
-
- //send it across the network
+
+ // send it across the network
method.setRequestEntity(data);
-
- log.info(">>>>>> HTTP post to " + dest+" length = "+ data.getContentLength());
+
+ log.info(">>>>>> HTTP post to " + dest + " length = "
+ + data.getContentLength());
// Send POST request
-
- //client.setTimeout(15*1000);
+
+ // client.setTimeout(15*1000);
int statusCode = client.executeMethod(method);
-
- if (statusCode != HttpStatus.SC_OK) {
- log.error(">>>>>> HTTP post response statusCode: " +statusCode + ", statusLine: " + method.getStatusLine());
- //do something aggressive here
+
+ if (statusCode != HttpStatus.SC_OK) {
+ log.error(">>>>>> HTTP post response statusCode: " + statusCode
+ + ", statusLine: " + method.getStatusLine());
+ // do something aggressive here
throw new HttpException("got back a failure from server");
}
- //implicitly "else"
- log.info(">>>>>> HTTP Got success back from the remote collector; response length "+ method.getResponseContentLength());
+ // implicitly "else"
+ log
+ .info(">>>>>> HTTP Got success back from the remote collector; response length "
+ + method.getResponseContentLength());
- //FIXME: should parse acks here
+ // FIXME: should parse acks here
InputStream rstream = null;
-
+
// Get the response body
byte[] resp_buf = method.getResponseBody();
- rstream = new ByteArrayInputStream(resp_buf);
+ rstream = new ByteArrayInputStream(resp_buf);
BufferedReader br = new BufferedReader(new InputStreamReader(rstream));
String line;
while ((line = br.readLine()) != null) {
@@ -272,10 +284,12 @@
}
}
}
-
- public static void main(String[] argv) throws InterruptedException{
- //HttpConnectorClient cc = new HttpConnectorClient();
- //do something smarter than to hide record headaches, like force them to create and add records to a chunk
- //cc.addChunk("test-source", "test-streamName", "test-application", "test-dataType", new byte[]{1,2,3,4,5});
+
+ public static void main(String[] argv) throws InterruptedException {
+ // HttpConnectorClient cc = new HttpConnectorClient();
+ // do something smarter than to hide record headaches, like force them to
+ // create and add records to a chunk
+ // cc.addChunk("test-source", "test-streamName", "test-application",
+ // "test-dataType", new byte[]{1,2,3,4,5});
}
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java Wed Mar 11 22:39:26 2009
@@ -1,12 +1,12 @@
package org.apache.hadoop.chukwa.datacollection.sender;
+
/**
* Encapsulates all of the communication overhead needed for chunks to be delivered
* to a collector.
*/
import java.util.Iterator;
import java.util.List;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
@@ -16,10 +16,11 @@
*
* @param chunksToSend a list of chunks to commit
* @return the list of committed chunks
- * @throws InterruptedException if interrupted while trying to send
+ * @throws InterruptedException if interrupted while trying to send
*/
- public List<CommitListEntry> send(List<Chunk> chunksToSend) throws InterruptedException, java.io.IOException;
-
+ public List<CommitListEntry> send(List<Chunk> chunksToSend)
+ throws InterruptedException, java.io.IOException;
+
public void setCollectors(Iterator<String> collectors);
-
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java Wed Mar 11 22:39:26 2009
@@ -18,67 +18,71 @@
package org.apache.hadoop.chukwa.datacollection.sender;
+
import java.io.*;
import java.net.URL;
import java.util.*;
-
import org.apache.hadoop.conf.Configuration;
/***
- * An iterator returning a list of Collectors to try.
- * This class is nondeterministic, since it puts collectors back on the list after some period.
+ * An iterator returning a list of Collectors to try. This class is
+ * nondeterministic, since it puts collectors back on the list after some
+ * period.
+ *
+ * No node will be polled more than once per maxRetryRateMs milliseconds.
+ * hasNext() will continue return true if you have not called it recently.
+ *
*
- * No node will be polled more than once per maxRetryRateMs milliseconds. hasNext() will continue return
- * true if you have not called it recently.
- *
- *
*/
public class RetryListOfCollectors implements Iterator<String> {
int maxRetryRateMs;
List<String> collectors;
long lastLookAtFirstNode;
- int nextCollector=0;
- private String portNo;
+ int nextCollector = 0;
+ private String portNo;
Configuration conf;
-
- public RetryListOfCollectors(File collectorFile, int maxRetryRateMs) throws IOException {
+
+ public RetryListOfCollectors(File collectorFile, int maxRetryRateMs)
+ throws IOException {
this.maxRetryRateMs = maxRetryRateMs;
lastLookAtFirstNode = 0;
collectors = new ArrayList<String>();
conf = new Configuration();
- portNo = conf.get("chukwaCollector.http.port","8080");
-
- try{
- BufferedReader br = new BufferedReader(new FileReader(collectorFile));
+ portNo = conf.get("chukwaCollector.http.port", "8080");
+
+ try {
+ BufferedReader br = new BufferedReader(new FileReader(collectorFile));
String line;
- while((line = br.readLine()) != null) {
- if(!line.contains("://")) {
- //no protocol, assume http
- if(line.matches(":\\d+")) {
- collectors.add("http://" + line);
- } else {
- collectors.add("http://" + line + ":" + portNo + "/");
- }
+ while ((line = br.readLine()) != null) {
+ if (!line.contains("://")) {
+ // no protocol, assume http
+ if (line.matches(":\\d+")) {
+ collectors.add("http://" + line);
+ } else {
+ collectors.add("http://" + line + ":" + portNo + "/");
+ }
} else {
- if(line.matches(":\\d+")) {
- collectors.add(line);
- } else {
- collectors.add(line + ":" + portNo + "/");
- }
- collectors.add(line);
+ if (line.matches(":\\d+")) {
+ collectors.add(line);
+ } else {
+ collectors.add(line + ":" + portNo + "/");
+ }
+ collectors.add(line);
}
}
br.close();
- }catch(FileNotFoundException e){
- System.err.println("Error in RetryListOfCollectors() opening file: collectors, double check that you have set the CHUKWA_CONF_DIR environment variable. Also, ensure file exists and is in classpath");
- }catch(IOException e){
- System.err.println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
+ } catch (FileNotFoundException e) {
+ System.err
+ .println("Error in RetryListOfCollectors() opening file: collectors, double check that you have set the CHUKWA_CONF_DIR environment variable. Also, ensure file exists and is in classpath");
+ } catch (IOException e) {
+ System.err
+ .println("I/O error in RetryListOfcollectors instantiation in readLine() from specified collectors file");
throw e;
}
shuffleList();
}
-
+
public RetryListOfCollectors(final List<String> collectors, int maxRetryRateMs) {
this.maxRetryRateMs = maxRetryRateMs;
lastLookAtFirstNode = 0;
@@ -86,54 +90,54 @@
this.collectors.addAll(collectors);
shuffleList();
}
-
- //for now, use a simple O(n^2) algorithm.
- //safe, because we only do this once, and on smalls list
+
+ // for now, use a simple O(n^2) algorithm.
+ // safe, because we only do this once, and on smalls list
private void shuffleList() {
- ArrayList<String> newList = new ArrayList<String>();
+ ArrayList<String> newList = new ArrayList<String>();
Random r = new java.util.Random();
- while(!collectors.isEmpty()) {
+ while (!collectors.isEmpty()) {
int toRemove = r.nextInt(collectors.size());
String next = collectors.remove(toRemove);
newList.add(next);
}
collectors = newList;
}
-
+
public boolean hasNext() {
- return collectors.size() > 0 &&
- ( (nextCollector != 0) ||
- (System.currentTimeMillis() - lastLookAtFirstNode > maxRetryRateMs ));
- }
+ return collectors.size() > 0
+ && ((nextCollector != 0) || (System.currentTimeMillis()
+ - lastLookAtFirstNode > maxRetryRateMs));
+ }
public String next() {
- if(hasNext()) {
+ if (hasNext()) {
int currCollector = nextCollector;
- nextCollector = (nextCollector +1)% collectors.size();
- if(currCollector == 0)
+ nextCollector = (nextCollector + 1) % collectors.size();
+ if (currCollector == 0)
lastLookAtFirstNode = System.currentTimeMillis();
return collectors.get(currCollector);
- }
- else
+ } else
return null;
}
-
- public String getRandomCollector(){
- return collectors.get( (int)java.lang.Math.random() * collectors.size());
+
+ public String getRandomCollector() {
+ return collectors.get((int) java.lang.Math.random() * collectors.size());
}
-
- public void add(URL collector){
+
+ public void add(URL collector) {
collectors.add(collector.toString());
}
- public void remove() {
+ public void remove() {
throw new UnsupportedOperationException();
- //FIXME: maybe just remove a collector from our list and then
- //FIXME: make sure next doesn't break (i.e. reset nextCollector if necessary)
+ // FIXME: maybe just remove a collector from our list and then
+ // FIXME: make sure next doesn't break (i.e. reset nextCollector if
+ // necessary)
}
/**
- *
+ *
* @return total number of collectors in list
*/
int total() {
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java Wed Mar 11 22:39:26 2009
@@ -18,80 +18,73 @@
package org.apache.hadoop.chukwa.datacollection.test;
+
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.*;
import org.apache.hadoop.chukwa.datacollection.agent.*;
import org.apache.hadoop.chukwa.datacollection.connector.Connector;
-
import java.util.*;
/**
- * Output events to stdout.
- * Intended for debugging use.
- *
+ * Output events to stdout. Intended for debugging use.
+ *
*/
public class ConsoleOutConnector extends Thread implements Connector {
-
+
final ChukwaAgent agent;
volatile boolean shutdown;
final boolean silent;
-
public ConsoleOutConnector(ChukwaAgent a) {
this(a, false);
}
-
- public ConsoleOutConnector(ChukwaAgent a, boolean silent)
- {
+
+ public ConsoleOutConnector(ChukwaAgent a, boolean silent) {
agent = a;
this.silent = silent;
}
-
- public void run()
- {
- try{
+
+ public void run() {
+ try {
System.out.println("console connector started");
ChunkQueue eventQueue = DataFactory.getInstance().getEventQueue();
- if(!silent)
+ if (!silent)
System.out.println("-------------------");
-
- while(!shutdown)
- {
+
+ while (!shutdown) {
List<Chunk> evts = new ArrayList<Chunk>();
eventQueue.collect(evts, 1);
-
- for(Chunk e: evts)
- {
- if(!silent) {
- System.out.println("Console out connector got event at offset " + e.getSeqID());
+
+ for (Chunk e : evts) {
+ if (!silent) {
+ System.out.println("Console out connector got event at offset "
+ + e.getSeqID());
System.out.println("data type was " + e.getDataType());
- if(e.getData().length > 1000)
- System.out.println("data length was " + e.getData().length+ ", not printing");
+ if (e.getData().length > 1000)
+ System.out.println("data length was " + e.getData().length
+ + ", not printing");
else
System.out.println(new String(e.getData()));
}
-
+
agent.reportCommit(e.getInitiator(), e.getSeqID());
-
- if(!silent)
+
+ if (!silent)
System.out.println("-------------------");
}
}
- }
- catch(InterruptedException e)
- {} //thread is about to exit anyway
+ } catch (InterruptedException e) {
+ } // thread is about to exit anyway
}
- public void shutdown()
- {
+ public void shutdown() {
shutdown = true;
this.interrupt();
}
-@Override
-public void reloadConfiguration()
-{
- System.out.println("reloadConfiguration");
-}
+ @Override
+ public void reloadConfiguration() {
+ System.out.println("reloadConfiguration");
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.test;
+
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
@@ -32,66 +33,66 @@
public class FileTailerStressTest {
- static final int DELAY_MIN = 10*1000;
- static final int DELAY_RANGE = 2* 1000;
-
- static class OccasionalWriterThread extends Thread
- {
+ static final int DELAY_MIN = 10 * 1000;
+ static final int DELAY_RANGE = 2 * 1000;
+
+ static class OccasionalWriterThread extends Thread {
File file;
-
- OccasionalWriterThread(File f) {
+
+ OccasionalWriterThread(File f) {
file = f;
}
-
- public void run() {
+
+ public void run() {
try {
- FileOutputStream fos = new FileOutputStream(file);
- PrintWriter out = new PrintWriter(fos);
- Random rand = new Random();
- while(true) {
- int delay = rand.nextInt( DELAY_RANGE ) + DELAY_MIN;
- Thread.sleep(delay);
- Date d = new Date();
- out.println("some test data written at " + d.toString());
- out.flush();
- }
- } catch(IOException e) {
+ FileOutputStream fos = new FileOutputStream(file);
+ PrintWriter out = new PrintWriter(fos);
+ Random rand = new Random();
+ while (true) {
+ int delay = rand.nextInt(DELAY_RANGE) + DELAY_MIN;
+ Thread.sleep(delay);
+ Date d = new Date();
+ out.println("some test data written at " + d.toString());
+ out.flush();
+ }
+ } catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
}
}
}
-
-static int FILES_TO_USE = 100;
+
+ static int FILES_TO_USE = 100;
+
/**
* @param args
*/
- public static void main(String[] args)
- {
- try{
+ public static void main(String[] args) {
+ try {
Server server = new Server(9990);
- Context root = new Context(server,"/",Context.SESSIONS);
-
+ Context root = new Context(server, "/", Context.SESSIONS);
+
ServletCollector.setWriter(new ConsoleWriter(true));
- root.addServlet(new ServletHolder(new ServletCollector(new ChukwaConfiguration(true))), "/*");
+ root.addServlet(new ServletHolder(new ServletCollector(
+ new ChukwaConfiguration(true))), "/*");
server.start();
server.setStopAtShutdown(false);
-
+
Thread.sleep(1000);
ChukwaAgent agent = new ChukwaAgent();
- HttpConnector connector = new HttpConnector(agent, "http://localhost:9990/chukwa");
+ HttpConnector connector = new HttpConnector(agent,
+ "http://localhost:9990/chukwa");
connector.start();
-
+
ChukwaConfiguration cc = new ChukwaConfiguration();
int portno = cc.getInt("chukwaAgent.control.port", 9093);
ChukwaAgentController cli = new ChukwaAgentController("localhost", portno);
-
File workdir = new File("/tmp/stresstest/");
workdir.mkdir();
- for(int i = 0; i < FILES_TO_USE; ++i) {
- File newTestF = new File( "/tmp/stresstest/" + i);
-
+ for (int i = 0; i < FILES_TO_USE; ++i) {
+ File newTestF = new File("/tmp/stresstest/" + i);
+
newTestF.deleteOnExit();
(new OccasionalWriterThread(newTestF)).start();
cli.addFile("test-lines", newTestF.getAbsolutePath());
@@ -100,7 +101,7 @@
Thread.sleep(60 * 1000);
System.out.println("cleaning up");
workdir.delete();
- } catch(Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
}
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java Wed Mar 11 22:39:26 2009
@@ -18,8 +18,8 @@
package org.apache.hadoop.chukwa.datacollection.test;
-import java.net.URI;
+import java.net.URI;
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.conf.Configuration;
@@ -29,59 +29,53 @@
import org.apache.hadoop.io.Writable;
public class SinkFileValidator {
-
- public static void main(String[] args)
- {
+
+ public static void main(String[] args) {
String fsURL = "hdfs://localhost:9000";
String fname;
- if(args.length < 1)
- {
- System.out.println("usage: SinkFileValidator <filename> [filesystem URI] ");
+ if (args.length < 1) {
+ System.out
+ .println("usage: SinkFileValidator <filename> [filesystem URI] ");
System.exit(0);
}
fname = args[0];
- if(args.length > 1)
+ if (args.length > 1)
fsURL = args[1];
Configuration conf = new Configuration();
- try
- {
- FileSystem fs;
- if(fsURL.equals("local"))
- fs = FileSystem.getLocal(conf);
- else
- fs= FileSystem.get(new URI(fsURL), conf);
- SequenceFile.Reader r= new SequenceFile.Reader(fs, new Path(fname), conf);
- System.out.println("key class name is " + r.getKeyClassName());
- System.out.println("value class name is " + r.getValueClassName());
-
- ChukwaArchiveKey key = new ChukwaArchiveKey();
- ChunkImpl evt = ChunkImpl.getBlankChunk();
- int events = 0;
- while(r.next(key, evt) && (events < 5))
- {
- if(!Writable.class.isAssignableFrom(key.getClass()))
- System.out.println("warning: keys aren't writable");
-
- if(!Writable.class.isAssignableFrom(evt.getClass()))
- System.out.println("warning: values aren't writable");
-
- if(evt.getData().length > 1000)
- {
- System.out.println("got event; data: " + new String(evt.getData(), 0, 1000));
- System.out.println("....[truncating]");
- }
+ try {
+ FileSystem fs;
+ if (fsURL.equals("local"))
+ fs = FileSystem.getLocal(conf);
else
- System.out.println("got event; data: " + new String(evt.getData()));
- events ++;
- }
- System.out.println("file looks OK!");
- }
- catch(Exception e)
- {
+ fs = FileSystem.get(new URI(fsURL), conf);
+ SequenceFile.Reader r = new SequenceFile.Reader(fs, new Path(fname), conf);
+ System.out.println("key class name is " + r.getKeyClassName());
+ System.out.println("value class name is " + r.getValueClassName());
+
+ ChukwaArchiveKey key = new ChukwaArchiveKey();
+ ChunkImpl evt = ChunkImpl.getBlankChunk();
+ int events = 0;
+ while (r.next(key, evt) && (events < 5)) {
+ if (!Writable.class.isAssignableFrom(key.getClass()))
+ System.out.println("warning: keys aren't writable");
+
+ if (!Writable.class.isAssignableFrom(evt.getClass()))
+ System.out.println("warning: values aren't writable");
+
+ if (evt.getData().length > 1000) {
+ System.out.println("got event; data: "
+ + new String(evt.getData(), 0, 1000));
+ System.out.println("....[truncating]");
+ } else
+ System.out.println("got event; data: " + new String(evt.getData()));
+ events++;
+ }
+ System.out.println("file looks OK!");
+ } catch (Exception e) {
e.printStackTrace();
}
-
+
}
}