You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/03/11 23:39:32 UTC
svn commit: r752666 [3/16] - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/ src/java/org/apache/hadoop/chukwa/conf/
src/java/org/apache/hadoop/chukwa/database/
src/java/org/apache/hadoop/chukwa/datacollection/
src/java/org/apache/hadoop...
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8.java Wed Mar 11 22:39:26 2009
@@ -18,59 +18,57 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import java.util.ArrayList;
/**
- * A subclass of FileTailingAdaptor that reads UTF8/ascii
- * files and splits records at carriage returns.
- *
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii files and splits
+ * records at carriage returns.
+ *
*/
public class CharFileTailingAdaptorUTF8 extends FileTailingAdaptor {
-
-
private static final char SEPARATOR = '\n';
-
+
private ArrayList<Integer> offsets = new ArrayList<Integer>();
-
+
/**
*
* Note: this method uses a temporary ArrayList (shared across instances).
* This means we're copying ints each time. This could be a performance issue.
- * Also, 'offsets' never shrinks, and will be of size proportional to the
+ * Also, 'offsets' never shrinks, and will be of size proportional to the
* largest number of lines ever seen in an event.
*/
@Override
- protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
- throws InterruptedException
- {
- for(int i = 0; i < buf.length; ++i) {
- if(buf[i] == SEPARATOR) {
- offsets.add(i);
- }
+ protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+ byte[] buf) throws InterruptedException {
+ for (int i = 0; i < buf.length; ++i) {
+ if (buf[i] == SEPARATOR) {
+ offsets.add(i);
}
+ }
+
+ if (offsets.size() > 0) {
+ int[] offsets_i = new int[offsets.size()];
+ for (int i = 0; i < offsets_i.length; ++i)
+ offsets_i[i] = offsets.get(i);
+
+ int bytesUsed = offsets_i[offsets_i.length - 1] + 1; // char at last
+ // offset uses a byte
+ assert bytesUsed > 0 : " shouldn't send empty events";
+ ChunkImpl event = new ChunkImpl(type, toWatch.getAbsolutePath(),
+ buffOffsetInFile + bytesUsed, buf, this);
+
+ event.setRecordOffsets(offsets_i);
+ eq.add(event);
+
+ offsets.clear();
+ return bytesUsed;
+ } else
+ return 0;
- if(offsets.size() > 0) {
- int[] offsets_i = new int[offsets.size()];
- for(int i = 0; i < offsets_i.length ; ++i)
- offsets_i[i] = offsets.get(i);
-
- int bytesUsed = offsets_i[offsets_i.length-1] + 1; //char at last offset uses a byte
- assert bytesUsed > 0: " shouldn't send empty events";
- ChunkImpl event = new ChunkImpl(type, toWatch.getAbsolutePath(),buffOffsetInFile + bytesUsed, buf, this );
-
- event.setRecordOffsets(offsets_i);
- eq.add(event);
-
- offsets.clear();
- return bytesUsed;
- }
- else
- return 0;
-
}
-
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/CharFileTailingAdaptorUTF8NewLineEscaped.java Wed Mar 11 22:39:26 2009
@@ -18,74 +18,73 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
-import org.apache.hadoop.chukwa.util.RecordConstants;
-
+import org.apache.hadoop.chukwa.util.RecordConstants;
import java.util.ArrayList;
-
/**
- * A subclass of FileTailingAdaptor that reads UTF8/ascii
- * files and splits records at non-escaped carriage returns
- *
+ * A subclass of FileTailingAdaptor that reads UTF8/ascii files and splits
+ * records at non-escaped carriage returns
+ *
*/
-public class CharFileTailingAdaptorUTF8NewLineEscaped extends FileTailingAdaptor {
-
+public class CharFileTailingAdaptorUTF8NewLineEscaped extends
+ FileTailingAdaptor {
private static final char SEPARATOR = '\n';
-
+
private ArrayList<Integer> offsets = new ArrayList<Integer>();
-
+
/**
*
* Note: this method uses a temporary ArrayList (shared across instances).
* This means we're copying ints each time. This could be a performance issue.
- * Also, 'offsets' never shrinks, and will be of size proportional to the
+ * Also, 'offsets' never shrinks, and will be of size proportional to the
* largest number of lines ever seen in an event.
*/
@Override
- protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
- throws InterruptedException
- {
+ protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+ byte[] buf) throws InterruptedException {
String es = RecordConstants.RECORD_SEPARATOR_ESCAPE_SEQ;
- for(int i = 0; i < buf.length; ++i) {
- // if this is a separator
- if(buf[i] == SEPARATOR){
+ for (int i = 0; i < buf.length; ++i) {
+ // if this is a separator
+ if (buf[i] == SEPARATOR) {
// if possibly preceded by escape sequence (avoid outOfBounds here)
boolean escaped = false; // was it escaped?
- if (i-es.length() >= 0){
- escaped = true; // maybe (at least there was room for an escape sequence, so let's check for the e.s.)
- for (int j = 0; j < es.length(); j++){
- if (buf[i-es.length()+j] != es.charAt(j)){
+ if (i - es.length() >= 0) {
+ escaped = true; // maybe (at least there was room for an escape
+ // sequence, so let's check for the e.s.)
+ for (int j = 0; j < es.length(); j++) {
+ if (buf[i - es.length() + j] != es.charAt(j)) {
escaped = false;
}
}
}
- if (!escaped){
+ if (!escaped) {
offsets.add(i);
}
}
}
- if(offsets.size() > 0) {
+ if (offsets.size() > 0) {
int[] offsets_i = new int[offsets.size()];
- for(int i = 0; i < offsets_i.length ; ++i)
+ for (int i = 0; i < offsets_i.length; ++i)
offsets_i[i] = offsets.get(i);
- //make the stream unique to this adaptor
- int bytesUsed = offsets_i[offsets_i.length-1] + 1; //char at last offset uses a byte
- assert bytesUsed > 0: " shouldn't send empty events";
+ // make the stream unique to this adaptor
+ int bytesUsed = offsets_i[offsets_i.length - 1] + 1; // char at last
+ // offset uses a byte
+ assert bytesUsed > 0 : " shouldn't send empty events";
ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
- buffOffsetInFile + bytesUsed, buf,this);
-
+ buffOffsetInFile + bytesUsed, buf, this);
+
chunk.setSeqID(buffOffsetInFile + bytesUsed);
chunk.setRecordOffsets(offsets_i);
eq.add(chunk);
-
+
offsets.clear();
return bytesUsed;
- }
- else
+ } else
return 0;
}
@@ -93,5 +92,4 @@
return "escaped newline CFTA-UTF8";
}
-
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Wed Mar 11 22:39:26 2009
@@ -18,9 +18,9 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
-
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
@@ -28,57 +28,59 @@
import org.apache.log4j.Logger;
/**
- * A shared thread used by all FileTailingAdaptors.
+ * A shared thread used by all FileTailingAdaptors.
*
- * For now, it tries each file in succession. If it gets through every
- * file within two seconds, and no more data remains, it will sleep.
+ * For now, it tries each file in succession. If it gets through every file
+ * within two seconds, and no more data remains, it will sleep.
*
* If there was still data available in any file, the adaptor will loop again.
- *
+ *
*/
class FileTailer extends Thread {
static Logger log = Logger.getLogger(FileTailer.class);
-
- private List<FileTailingAdaptor> adaptors;
+
+ private List<FileTailingAdaptor> adaptors;
private volatile boolean isRunning = true;
- ChunkQueue eq; //not private -- useful for file tailing adaptor classes
-
+ ChunkQueue eq; // not private -- useful for file tailing adaptor classes
+
/**
* How often to tail each file.
*/
- int DEFAULT_SAMPLE_PERIOD_MS = 1000* 2;
+ int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 2;
int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
private static Configuration conf = null;
-
+
FileTailer() {
- if (conf == null) {
- ChukwaAgent agent = ChukwaAgent.getAgent();
- if (agent != null) {
- conf = agent.getConfiguration();
- if (conf != null) {
- SAMPLE_PERIOD_MS= conf.getInt("chukwaAgent.adaptor.context.switch.time", DEFAULT_SAMPLE_PERIOD_MS);
+ if (conf == null) {
+ ChukwaAgent agent = ChukwaAgent.getAgent();
+ if (agent != null) {
+ conf = agent.getConfiguration();
+ if (conf != null) {
+ SAMPLE_PERIOD_MS = conf.getInt(
+ "chukwaAgent.adaptor.context.switch.time",
+ DEFAULT_SAMPLE_PERIOD_MS);
}
- }
- }
+ }
+ }
eq = DataFactory.getInstance().getEventQueue();
-
- //iterations are much more common than adding a new adaptor
+
+ // iterations are much more common than adding a new adaptor
adaptors = new CopyOnWriteArrayList<FileTailingAdaptor>();
this.setDaemon(true);
- start();//start the file-tailing thread
+ start();// start the file-tailing thread
+ }
+
+ // called by FileTailingAdaptor, only
+ void startWatchingFile(FileTailingAdaptor f) {
+ adaptors.add(f);
}
-
- //called by FileTailingAdaptor, only
- void startWatchingFile(FileTailingAdaptor f) {
- adaptors.add(f);
- }
-
- //called by FileTailingAdaptor, only
- void stopWatchingFile(FileTailingAdaptor f) {
- adaptors.remove(f);
- }
-
+
+ // called by FileTailingAdaptor, only
+ void stopWatchingFile(FileTailingAdaptor f) {
+ adaptors.remove(f);
+ }
+
public void run() {
while (isRunning) {
try {
@@ -98,6 +100,5 @@
}
}
}
-
-
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java Wed Mar 11 22:39:26 2009
@@ -18,12 +18,12 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
@@ -34,298 +34,313 @@
/**
* An adaptor that repeatedly tails a specified file, sending the new bytes.
- * This class does not split out records, but just sends everything up to end of file.
- * Subclasses can alter this behavior by overriding extractRecords().
+ * This class does not split out records, but just sends everything up to end of
+ * file. Subclasses can alter this behavior by overriding extractRecords().
*
*/
-public class FileTailingAdaptor implements Adaptor
-{
+public class FileTailingAdaptor implements Adaptor {
+
+ static Logger log;
+
+ /**
+ * This is the maximum amount we'll read from any one file before moving on to
+ * the next. This way, we get quick response time for other files if one file
+ * is growing rapidly.
+ */
+ public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
+ public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE;
+ public static int MAX_RETRIES = 300;
+ public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
+
+ protected static Configuration conf = null;
+ private int attempts = 0;
+ private long gracefulPeriodExpired = 0l;
+ private boolean adaptorInError = false;
+ File toWatch;
+ /**
+ * next PHYSICAL offset to read
+ */
+ protected long fileReadOffset;
+ protected String type;
+ private ChunkReceiver dest;
+ protected RandomAccessFile reader = null;
+ protected long adaptorID;
+
+ /**
+ * The logical offset of the first byte of the file
+ */
+ private long offsetOfFirstByte = 0;
+
+ private static FileTailer tailer;
+
+ static {
+ tailer = new FileTailer();
+ log = Logger.getLogger(FileTailingAdaptor.class);
+ }
- static Logger log;
+ public void start(long adaptorID, String type, String params, long bytes,
+ ChunkReceiver dest) {
+ // in this case params = filename
+ this.adaptorID = adaptorID;
+ this.type = type;
+ this.dest = dest;
+ this.attempts = 0;
+
+ Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
+ Matcher m = cmd.matcher(params);
+ if (m.matches()) {
+ offsetOfFirstByte = Long.parseLong(m.group(1));
+ toWatch = new File(m.group(2));
+ } else {
+ toWatch = new File(params.trim());
+ }
+ log.info("started file tailer on file " + toWatch
+ + " with first byte at offset " + offsetOfFirstByte);
- /**
- * This is the maximum amount we'll read from any one file before moving on
- * to the next. This way, we get quick response time for other files if one
- * file is growing rapidly.
- */
- public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024 ;
- public static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE ;
- public static int MAX_RETRIES = 300;
- public static int GRACEFUL_PERIOD = 3 * 60 * 1000; // 3 minutes
-
- protected static Configuration conf = null;
- private int attempts = 0;
- private long gracefulPeriodExpired = 0l;
- private boolean adaptorInError = false;
- File toWatch;
- /**
- * next PHYSICAL offset to read
- */
- protected long fileReadOffset;
- protected String type;
- private ChunkReceiver dest;
- protected RandomAccessFile reader = null;
- protected long adaptorID;
-
- /**
- * The logical offset of the first byte of the file
- */
- private long offsetOfFirstByte = 0;
-
- private static FileTailer tailer;
-
- static {
- tailer = new FileTailer();
- log =Logger.getLogger(FileTailingAdaptor.class);
- }
-
- public void start(long adaptorID, String type, String params, long bytes, ChunkReceiver dest) {
- //in this case params = filename
- this.adaptorID = adaptorID;
- this.type = type;
- this.dest = dest;
- this.attempts = 0;
-
- Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
- Matcher m = cmd.matcher(params);
- if(m.matches()) {
- offsetOfFirstByte = Long.parseLong(m.group(1));
- toWatch = new File(m.group(2));
- } else {
- toWatch = new File(params.trim());
- }
- log.info("started file tailer on file " + toWatch + " with first byte at offset "+offsetOfFirstByte);
-
- this.fileReadOffset= bytes;
- tailer.startWatchingFile(this);
- }
-
- /**
- * Do one last tail, and then stop
- * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
- */
- public long shutdown() throws AdaptorException {
- try{
- if(toWatch.exists()) {
- int retry=0;
- tailer.stopWatchingFile(this);
- TerminatorThread lastTail = new TerminatorThread(this,tailer.eq);
- lastTail.setDaemon(true);
- lastTail.start();
- while(lastTail.isAlive() && retry < 60) {
- try {
- log.info("Retry:"+retry);
- Thread.currentThread().sleep(1000);
- retry++;
- } catch(InterruptedException ex) {
- }
- }
- }
- } finally {
- return fileReadOffset + offsetOfFirstByte;
- }
-
- }
- /**
- * Stop tailing the file, effective immediately.
- */
- public void hardStop() throws AdaptorException {
+ this.fileReadOffset = bytes;
+ tailer.startWatchingFile(this);
+ }
+
+ /**
+ * Do one last tail, and then stop
+ *
+ * @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#shutdown()
+ */
+ public long shutdown() throws AdaptorException {
+ try {
+ if (toWatch.exists()) {
+ int retry = 0;
tailer.stopWatchingFile(this);
- }
+ TerminatorThread lastTail = new TerminatorThread(this, tailer.eq);
+ lastTail.setDaemon(true);
+ lastTail.start();
+ while (lastTail.isAlive() && retry < 60) {
+ try {
+ log.info("Retry:" + retry);
+ Thread.currentThread().sleep(1000);
+ retry++;
+ } catch (InterruptedException ex) {
+ }
+ }
+ }
+ } finally {
+ return fileReadOffset + offsetOfFirstByte;
+ }
+
+ }
+
+ /**
+ * Stop tailing the file, effective immediately.
+ */
+ public void hardStop() throws AdaptorException {
+ tailer.stopWatchingFile(this);
+ }
/**
* @see org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor#getCurrentStatus()
*/
- public String getCurrentStatus() {
- return type.trim() + " " + offsetOfFirstByte+ " " + toWatch.getPath() + " " + fileReadOffset;
- // can make this more efficient using a StringBuilder
- }
-
- public String toString() {
- return "Tailer on " + toWatch;
- }
-
- public String getStreamName() {
- return toWatch.getPath();
- }
-
- /**
- * Looks at the tail of the associated file, adds some of it to event queue
- * This method is not thread safe. Returns true if there's more data in the
- * file
- *
- * @param eq the queue to write Chunks into
- */
- public synchronized boolean tailFile(ChunkReceiver eq) throws InterruptedException {
+ public String getCurrentStatus() {
+ return type.trim() + " " + offsetOfFirstByte + " " + toWatch.getPath()
+ + " " + fileReadOffset;
+ // can make this more efficient using a StringBuilder
+ }
+
+ public String toString() {
+ return "Tailer on " + toWatch;
+ }
+
+ public String getStreamName() {
+ return toWatch.getPath();
+ }
+
+ /**
+ * Looks at the tail of the associated file, adds some of it to event queue
+ * This method is not thread safe. Returns true if there's more data in the
+ * file
+ *
+ * @param eq the queue to write Chunks into
+ */
+ public synchronized boolean tailFile(ChunkReceiver eq)
+ throws InterruptedException {
boolean hasMoreData = false;
-
- try {
- if( (adaptorInError == true) && (System.currentTimeMillis() > gracefulPeriodExpired)) {
- if (!toWatch.exists()) {
- log.warn("Adaptor|" + adaptorID +"|attempts=" + attempts + "| File does not exist: "+toWatch.getAbsolutePath()+", streaming policy expired. File removed from streaming.");
- } else if (!toWatch.canRead()) {
- log.warn("Adaptor|" + adaptorID +"|attempts=" + attempts + "| File cannot be read: "+toWatch.getAbsolutePath()+", streaming policy expired. File removed from streaming.");
- } else {
- // Should have never been there
- adaptorInError = false;
- gracefulPeriodExpired = 0L;
- attempts = 0;
- return false;
- }
-
-
- ChukwaAgent agent = ChukwaAgent.getAgent();
- if (agent != null) {
- agent.stopAdaptor(adaptorID, false);
- } else {
- log.info("Agent is null, running in default mode");
- }
- return false;
-
- } else if(!toWatch.exists() || !toWatch.canRead()) {
- if (adaptorInError == false) {
- long now = System.currentTimeMillis();
- gracefulPeriodExpired = now + GRACEFUL_PERIOD;
- adaptorInError = true;
- attempts = 0;
- log.warn("failed to stream data for: "+toWatch.getAbsolutePath()+", graceful period will Expire at now:" + now
- + " + " + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
- } else if (attempts%10 == 0) {
- log.info("failed to stream data for: "+toWatch.getAbsolutePath()+", attempt: "+attempts);
- }
-
- attempts++;
- return false; //no more data
- }
-
- if (reader == null)
- {
- reader = new RandomAccessFile(toWatch, "r");
- log.info("Adaptor|" + adaptorID + "|Opening the file for the first time|seek|" + fileReadOffset);
- }
-
- long len = 0L;
- try {
- RandomAccessFile newReader = new RandomAccessFile(toWatch,"r");
- len = reader.length();
- long newLength = newReader.length();
- if(newLength<len && fileReadOffset >= len) {
- reader.close();
- reader = newReader;
- fileReadOffset=0L;
- log.debug("Adaptor|" + adaptorID +"| File size mismatched, rotating: "+toWatch.getAbsolutePath());
- } else {
- try {
- newReader.close();
- } catch(IOException e) {
- // do nothing.
- }
- }
- } catch(IOException e) {
- // do nothing, if file doesn't exist.
- }
- if (len >= fileReadOffset) {
- if(offsetOfFirstByte>fileReadOffset) {
- // If the file rotated, the recorded offsetOfFirstByte is greater than file size,
- // reset the first byte position to beginning of the file.
- fileReadOffset=0;
- offsetOfFirstByte = 0L;
- log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
- }
-
- log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset );
- reader.seek(fileReadOffset);
-
- long bufSize = len - fileReadOffset;
-
- if (conf == null)
- {
- ChukwaAgent agent = ChukwaAgent.getAgent();
- if (agent != null)
- {
- conf = agent.getConfiguration();
- if (conf != null)
- {
- MAX_READ_SIZE= conf.getInt("chukwaAgent.fileTailingAdaptor.maxReadSize", DEFAULT_MAX_READ_SIZE);
- log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: " + MAX_READ_SIZE);
- }
- else
- {
- log.info("Conf is null, running in default mode");
- }
- }
- else
- {
- log.info("Agent is null, running in default mode");
- }
- }
-
- if (bufSize > MAX_READ_SIZE) {
- bufSize = MAX_READ_SIZE;
- hasMoreData = true;
- }
- byte[] buf = new byte[(int) bufSize];
-
-
- long curOffset = fileReadOffset;
-
- int bufferRead = reader.read(buf);
- assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
- + " pointer is "
- + reader.getFilePointer()
- + " but offset is " + fileReadOffset + bufSize;
-
- int bytesUsed = extractRecords(dest, fileReadOffset + offsetOfFirstByte, buf);
-
- // === WARNING ===
- // If we couldn't found a complete record AND
- // we cannot read more, i.e bufferRead == MAX_READ_SIZE
- // it's because the record is too BIG
- // So log.warn, and drop current buffer so we can keep moving
- // instead of being stopped at that point for ever
- if ( bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
- log.warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, droping current buffer: startOffset="
- + curOffset + ", MAX_READ_SIZE=" + MAX_READ_SIZE + ", for " + toWatch.getPath());
- bytesUsed = buf.length;
- }
-
- fileReadOffset = fileReadOffset + bytesUsed;
-
-
- log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"+ fileReadOffset);
-
-
- } else {
- // file has rotated and no detection
- reader.close();
- reader=null;
- fileReadOffset = 0L;
- offsetOfFirstByte = 0L;
- hasMoreData = true;
- log.warn("Adaptor|" + adaptorID +"| file: " + toWatch.getPath() +", has rotated and no detection - reset counters to 0L");
- }
- } catch (IOException e) {
- log.warn("failure reading " + toWatch, e);
- }
- attempts=0;
- adaptorInError = false;
- return hasMoreData;
- }
-
+ try {
+ if ((adaptorInError == true)
+ && (System.currentTimeMillis() > gracefulPeriodExpired)) {
+ if (!toWatch.exists()) {
+ log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
+ + "| File does not exist: " + toWatch.getAbsolutePath()
+ + ", streaming policy expired. File removed from streaming.");
+ } else if (!toWatch.canRead()) {
+ log.warn("Adaptor|" + adaptorID + "|attempts=" + attempts
+ + "| File cannot be read: " + toWatch.getAbsolutePath()
+ + ", streaming policy expired. File removed from streaming.");
+ } else {
+ // Should have never been there
+ adaptorInError = false;
+ gracefulPeriodExpired = 0L;
+ attempts = 0;
+ return false;
+ }
+
+ ChukwaAgent agent = ChukwaAgent.getAgent();
+ if (agent != null) {
+ agent.stopAdaptor(adaptorID, false);
+ } else {
+ log.info("Agent is null, running in default mode");
+ }
+ return false;
+
+ } else if (!toWatch.exists() || !toWatch.canRead()) {
+ if (adaptorInError == false) {
+ long now = System.currentTimeMillis();
+ gracefulPeriodExpired = now + GRACEFUL_PERIOD;
+ adaptorInError = true;
+ attempts = 0;
+ log.warn("failed to stream data for: " + toWatch.getAbsolutePath()
+ + ", graceful period will Expire at now:" + now + " + "
+ + GRACEFUL_PERIOD + " secs, i.e:" + gracefulPeriodExpired);
+ } else if (attempts % 10 == 0) {
+ log.info("failed to stream data for: " + toWatch.getAbsolutePath()
+ + ", attempt: " + attempts);
+ }
+
+ attempts++;
+ return false; // no more data
+ }
+
+ if (reader == null) {
+ reader = new RandomAccessFile(toWatch, "r");
+ log.info("Adaptor|" + adaptorID
+ + "|Opening the file for the first time|seek|" + fileReadOffset);
+ }
+
+ long len = 0L;
+ try {
+ RandomAccessFile newReader = new RandomAccessFile(toWatch, "r");
+ len = reader.length();
+ long newLength = newReader.length();
+ if (newLength < len && fileReadOffset >= len) {
+ reader.close();
+ reader = newReader;
+ fileReadOffset = 0L;
+ log.debug("Adaptor|" + adaptorID
+ + "| File size mismatched, rotating: "
+ + toWatch.getAbsolutePath());
+ } else {
+ try {
+ newReader.close();
+ } catch (IOException e) {
+ // do nothing.
+ }
+ }
+ } catch (IOException e) {
+ // do nothing, if file doesn't exist.
+ }
+ if (len >= fileReadOffset) {
+ if (offsetOfFirstByte > fileReadOffset) {
+ // If the file rotated, the recorded offsetOfFirstByte is greater than
+ // file size,
+ // reset the first byte position to beginning of the file.
+ fileReadOffset = 0;
+ offsetOfFirstByte = 0L;
+ log.warn("offsetOfFirstByte>fileReadOffset, resetting offset to 0");
+ }
+
+ log.debug("Adaptor|" + adaptorID + "|seeking|" + fileReadOffset);
+ reader.seek(fileReadOffset);
+
+ long bufSize = len - fileReadOffset;
+
+ if (conf == null) {
+ ChukwaAgent agent = ChukwaAgent.getAgent();
+ if (agent != null) {
+ conf = agent.getConfiguration();
+ if (conf != null) {
+ MAX_READ_SIZE = conf.getInt(
+ "chukwaAgent.fileTailingAdaptor.maxReadSize",
+ DEFAULT_MAX_READ_SIZE);
+ log.info("chukwaAgent.fileTailingAdaptor.maxReadSize: "
+ + MAX_READ_SIZE);
+ } else {
+ log.info("Conf is null, running in default mode");
+ }
+ } else {
+ log.info("Agent is null, running in default mode");
+ }
+ }
+
+ if (bufSize > MAX_READ_SIZE) {
+ bufSize = MAX_READ_SIZE;
+ hasMoreData = true;
+ }
+ byte[] buf = new byte[(int) bufSize];
+
+ long curOffset = fileReadOffset;
+
+ int bufferRead = reader.read(buf);
+ assert reader.getFilePointer() == fileReadOffset + bufSize : " event size arithmetic is broken: "
+ + " pointer is "
+ + reader.getFilePointer()
+ + " but offset is "
+ + fileReadOffset + bufSize;
+
+ int bytesUsed = extractRecords(dest,
+ fileReadOffset + offsetOfFirstByte, buf);
+
+ // === WARNING ===
+ // If we couldn't found a complete record AND
+ // we cannot read more, i.e bufferRead == MAX_READ_SIZE
+ // it's because the record is too BIG
+ // So log.warn, and drop current buffer so we can keep moving
+ // instead of being stopped at that point for ever
+ if (bytesUsed == 0 && bufferRead == MAX_READ_SIZE) {
+ log
+ .warn("bufferRead == MAX_READ_SIZE AND bytesUsed == 0, droping current buffer: startOffset="
+ + curOffset
+ + ", MAX_READ_SIZE="
+ + MAX_READ_SIZE
+ + ", for "
+ + toWatch.getPath());
+ bytesUsed = buf.length;
+ }
+
+ fileReadOffset = fileReadOffset + bytesUsed;
+
+ log.debug("Adaptor|" + adaptorID + "|start|" + curOffset + "|end|"
+ + fileReadOffset);
+
+ } else {
+ // file has rotated and no detection
+ reader.close();
+ reader = null;
+ fileReadOffset = 0L;
+ offsetOfFirstByte = 0L;
+ hasMoreData = true;
+ log.warn("Adaptor|" + adaptorID + "| file: " + toWatch.getPath()
+ + ", has rotated and no detection - reset counters to 0L");
+ }
+ } catch (IOException e) {
+ log.warn("failure reading " + toWatch, e);
+ }
+ attempts = 0;
+ adaptorInError = false;
+ return hasMoreData;
+ }
+
/**
* Extract records from a byte sequence
+ *
* @param eq the queue to stick the new chunk[s] in
* @param buffOffsetInFile the byte offset in the stream at which buf[] begins
* @param buf the byte buffer to extract records from
* @return the number of bytes processed
* @throws InterruptedException
*/
- protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile, byte[] buf)
- throws InterruptedException
- {
- ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(), buffOffsetInFile + buf.length,
- buf, this);
+ protected int extractRecords(ChunkReceiver eq, long buffOffsetInFile,
+ byte[] buf) throws InterruptedException {
+ ChunkImpl chunk = new ChunkImpl(type, toWatch.getAbsolutePath(),
+ buffOffsetInFile + buf.length, buf, this);
eq.add(chunk);
return buf.length;
@@ -336,5 +351,4 @@
return type;
}
-
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TerminatorThread.java Wed Mar 11 22:39:26 2009
@@ -1,47 +1,57 @@
package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
import org.apache.hadoop.chukwa.datacollection.adaptor.FileAdaptor;
import org.apache.log4j.Logger;
public class TerminatorThread extends Thread {
- private static Logger log =Logger.getLogger(FileAdaptor.class);
+ private static Logger log = Logger.getLogger(FileAdaptor.class);
+
+ private FileTailingAdaptor adaptor = null;
+ private ChunkReceiver eq = null;
- private FileTailingAdaptor adaptor = null;
- private ChunkReceiver eq = null;
-
- public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
- this.adaptor = adaptor;
- this.eq = eq;
- }
+ public TerminatorThread(FileTailingAdaptor adaptor, ChunkReceiver eq) {
+ this.adaptor = adaptor;
+ this.eq = eq;
+ }
public void run() {
-
- long endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
+
+ long endTime = System.currentTimeMillis() + (10 * 60 * 1000); // now + 10
+ // mins
int count = 0;
log.info("Terminator thread started." + adaptor.toWatch.getPath());
try {
while (adaptor.tailFile(eq)) {
if (log.isDebugEnabled()) {
- log.debug("Terminator thread:" + adaptor.toWatch.getPath() + " still working");
+ log.debug("Terminator thread:" + adaptor.toWatch.getPath()
+ + " still working");
}
long now = System.currentTimeMillis();
- if (now > endTime ) {
- log.warn("TerminatorThread should have been finished by now! count=" + count);
- count ++;
- endTime = System.currentTimeMillis() + (10*60*1000); // now + 10 mins
- if (count >3 ) {
- log.warn("TerminatorThread should have been finished by now, stopping it now! count=" + count);
+ if (now > endTime) {
+ log.warn("TerminatorThread should have been finished by now! count="
+ + count);
+ count++;
+ endTime = System.currentTimeMillis() + (10 * 60 * 1000); // now + 10
+ // mins
+ if (count > 3) {
+ log
+ .warn("TerminatorThread should have been finished by now, stopping it now! count="
+ + count);
break;
}
}
}
} catch (InterruptedException e) {
- log.info("InterruptedException on Terminator thread:" + adaptor.toWatch.getPath(),e);
+ log.info("InterruptedException on Terminator thread:"
+ + adaptor.toWatch.getPath(), e);
} catch (Throwable e) {
- log.warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(),e);
+ log
+ .warn("Exception on Terminator thread:" + adaptor.toWatch.getPath(),
+ e);
}
-
+
log.info("Terminator thread finished." + adaptor.toWatch.getPath());
try {
adaptor.reader.close();
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorFactory.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.agent;
+
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.log4j.Logger;
@@ -26,47 +27,49 @@
*
*/
public class AdaptorFactory {
-
+
static Logger log = Logger.getLogger(ChukwaAgent.class);
- /**
- * Instantiate an adaptor that can be added by the {@link ChukwaAgent}
- * @param className the name of the {@link Adaptor} class to instantiate
- * @return an Adaptor of the specified type
- */
- static Adaptor createAdaptor(String className){
+
+ /**
+ * Instantiate an adaptor that can be added by the {@link ChukwaAgent}
+ *
+ * @param className the name of the {@link Adaptor} class to instantiate
+ * @return an Adaptor of the specified type
+ */
+ static Adaptor createAdaptor(String className) {
Object obj = null;
- try{
- //the following reflection business for type checking is probably unnecessary
- //since it will just throw a ClassCastException on error anyway.
+ try {
+ // the following reflection business for type checking is probably
+ // unnecessary
+ // since it will just throw a ClassCastException on error anyway.
obj = Class.forName(className).newInstance();
- if (Adaptor.class.isInstance(obj)){
+ if (Adaptor.class.isInstance(obj)) {
return (Adaptor) obj;
- }
- else
+ } else
return null;
- } catch (Exception e1){
- log.warn("Error instantiating new adaptor by class name, " +
- "attempting again, but with default chukwa package prepended, i.e. " +
- "org.apache.hadoop.chukwa.datacollection.adaptor." + className + ". "
- + e1);
- try{
- //if failed, try adding default class prefix
+ } catch (Exception e1) {
+ log
+ .warn("Error instantiating new adaptor by class name, "
+ + "attempting again, but with default chukwa package prepended, i.e. "
+ + "org.apache.hadoop.chukwa.datacollection.adaptor." + className
+ + ". " + e1);
+ try {
+ // if failed, try adding default class prefix
Object obj2 = Class.forName(
- "org.apache.hadoop.chukwa.datacollection.adaptor." +
- className).newInstance();
- if (Adaptor.class.isInstance(obj2)){
- log.debug("Succeeded in finding class by adding default chukwa " +
- "namespace prefix to class name profided");
+ "org.apache.hadoop.chukwa.datacollection.adaptor." + className)
+ .newInstance();
+ if (Adaptor.class.isInstance(obj2)) {
+ log.debug("Succeeded in finding class by adding default chukwa "
+ + "namespace prefix to class name profided");
return (Adaptor) obj2;
- }
- else
+ } else
return null;
} catch (Exception e2) {
- System.out.println("Also error instantiating new adaptor by classname" +
- "with prefix added" + e2);
+ System.out.println("Also error instantiating new adaptor by classname"
+ + "with prefix added" + e2);
return null;
}
}
}
-
+
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.agent;
+
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
@@ -28,7 +29,6 @@
import java.net.Socket;
import java.net.SocketException;
import java.util.Map;
-
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
import org.apache.log4j.Logger;
@@ -39,8 +39,8 @@
* programmatically and via telnet.
*
* The port to bind to can be specified by setting option
- * chukwaAgent.agent.control.port.
- * A port of 0 creates a socket on any free port.
+ * chukwaAgent.agent.control.port. A port of 0 creates a socket on any free
+ * port.
*/
public class AgentControlSocketListener extends Thread {
@@ -63,7 +63,8 @@
try {
InputStream in = connection.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(in));
- PrintStream out = new PrintStream(new BufferedOutputStream(connection.getOutputStream()));
+ PrintStream out = new PrintStream(new BufferedOutputStream(connection
+ .getOutputStream()));
String cmd = null;
while ((cmd = br.readLine()) != null) {
processCommand(cmd, out);
@@ -89,11 +90,13 @@
public void processCommand(String cmd, PrintStream out) throws IOException {
String[] words = cmd.split("\\s+");
if (log.isDebugEnabled()) {
- log.debug("command from " + connection.getRemoteSocketAddress() + ":"+ cmd);
+ log.debug("command from " + connection.getRemoteSocketAddress() + ":"
+ + cmd);
}
if (words[0].equalsIgnoreCase("help")) {
- out.println("you're talking to the Chukwa agent. Commands available: ");
+ out
+ .println("you're talking to the Chukwa agent. Commands available: ");
out.println("add [adaptorname] [args] [offset] -- start an adaptor");
out.println("shutdown [adaptornumber] -- graceful stop");
out.println("stop [adaptornumber] -- abrupt stop");
@@ -177,7 +180,8 @@
this.setDaemon(false); // to keep the local agent alive
this.agent = agent;
- this.portno = agent.getConfiguration().getInt("chukwaAgent.control.port", 9093);
+ this.portno = agent.getConfiguration().getInt("chukwaAgent.control.port",
+ 9093);
log.info("AgentControlSocketListerner ask for port: " + portno);
this.setName("control socket listener");
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.agent;
+
import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.adaptor.*;
import org.apache.hadoop.chukwa.datacollection.connector.*;
@@ -27,7 +28,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
-
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
@@ -39,16 +39,14 @@
* be embeddable, for use in testing.
*
*/
-public class ChukwaAgent
-{
- //boolean WRITE_CHECKPOINTS = true;
+public class ChukwaAgent {
+ // boolean WRITE_CHECKPOINTS = true;
static Logger log = Logger.getLogger(ChukwaAgent.class);
static ChukwaAgent agent = null;
private static PidFile pFile = null;
- public static ChukwaAgent getAgent()
- {
+ public static ChukwaAgent getAgent() {
return agent;
}
@@ -56,10 +54,8 @@
Connector connector = null;
// doesn't need an equals(), comparator, etc
- private static class Offset
- {
- public Offset(long l, long id)
- {
+ private static class Offset {
+ public Offset(long l, long id) {
offset = l;
this.id = id;
}
@@ -68,13 +64,11 @@
private volatile long offset;
}
- public static class AlreadyRunningException extends Exception
- {
+ public static class AlreadyRunningException extends Exception {
private static final long serialVersionUID = 1L;
- public AlreadyRunningException()
- {
+ public AlreadyRunningException() {
super("Agent already running; aborting");
}
}
@@ -105,41 +99,37 @@
public int getControllerPort() {
return controlSock.getPort();
}
-
+
/**
* @param args
* @throws AdaptorException
*/
- public static void main(String[] args) throws AdaptorException
- {
+ public static void main(String[] args) throws AdaptorException {
pFile = new PidFile("Agent");
Runtime.getRuntime().addShutdownHook(pFile);
- try
- {
+ try {
if (args.length > 0 && args[0].equals("-help")) {
- System.out.println("usage: LocalAgent [-noCheckPoint]" +
- "[default collector URL]");
+ System.out.println("usage: LocalAgent [-noCheckPoint]"
+ + "[default collector URL]");
System.exit(0);
}
Configuration conf = readConfig();
ChukwaAgent localAgent = new ChukwaAgent(conf);
- if (agent.anotherAgentIsRunning())
- {
+ if (agent.anotherAgentIsRunning()) {
System.out
- .println("another agent is running (or port has been usurped). " +
- "Bailing out now");
+ .println("another agent is running (or port has been usurped). "
+ + "Bailing out now");
System.exit(-1);
}
int uriArgNumber = 0;
- if (args.length > 0)
- {
+ if (args.length > 0) {
if (args[uriArgNumber].equals("local"))
agent.connector = new ConsoleOutConnector(agent);
- else {
+ else {
if (!args[uriArgNumber].contains("://"))
args[uriArgNumber] = "http://" + args[uriArgNumber];
agent.connector = new HttpConnector(agent, args[uriArgNumber]);
@@ -151,40 +141,34 @@
log.info("local agent started on port " + agent.getControlSock().portno);
- } catch (AlreadyRunningException e)
- {
- log
- .error("agent started already on this machine with same portno;" +
- " bailing out");
+ } catch (AlreadyRunningException e) {
+ log.error("agent started already on this machine with same portno;"
+ + " bailing out");
System.out
- .println("agent started already on this machine with same portno;" +
- " bailing out");
+ .println("agent started already on this machine with same portno;"
+ + " bailing out");
System.exit(0); // better safe than sorry
- } catch (Exception e)
- {
+ } catch (Exception e) {
e.printStackTrace();
}
}
- private boolean anotherAgentIsRunning()
- {
+ private boolean anotherAgentIsRunning() {
return !controlSock.isBound();
}
/**
* @return the number of running adaptors inside this local agent
*/
- public int adaptorCount()
- {
+ public int adaptorCount() {
return adaptorsByNumber.size();
}
-
public ChukwaAgent() throws AlreadyRunningException {
this(new Configuration());
}
- public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
+ public ChukwaAgent(Configuration conf) throws AlreadyRunningException {
ChukwaAgent.agent = this;
this.conf = conf;
@@ -193,20 +177,20 @@
adaptorPositions = new ConcurrentHashMap<Adaptor, Offset>();
adaptorsByNumber = new HashMap<Long, Adaptor>();
checkpointNumber = 0;
-
- boolean DO_CHECKPOINT_RESTORE = conf.getBoolean("chukwaAgent.checkpoint.enabled",
- true);
+
+ boolean DO_CHECKPOINT_RESTORE = conf.getBoolean(
+ "chukwaAgent.checkpoint.enabled", true);
CHECKPOINT_BASE_NAME = conf.get("chukwaAgent.checkpoint.name",
"chukwa_checkpoint_");
- final int CHECKPOINT_INTERVAL_MS = conf.getInt("chukwaAgent.checkpoint.interval",
- 5000);
-
- if(conf.get("chukwaAgent.checkpoint.dir") != null)
+ final int CHECKPOINT_INTERVAL_MS = conf.getInt(
+ "chukwaAgent.checkpoint.interval", 5000);
+
+ if (conf.get("chukwaAgent.checkpoint.dir") != null)
checkpointDir = new File(conf.get("chukwaAgent.checkpoint.dir", null));
else
DO_CHECKPOINT_RESTORE = false;
-
- if (checkpointDir!= null && !checkpointDir.exists()) {
+
+ if (checkpointDir != null && !checkpointDir.exists()) {
checkpointDir.mkdirs();
}
tags = conf.get("chukwaAgent.tags", "cluster=\"unknown\"");
@@ -221,11 +205,11 @@
if (DO_CHECKPOINT_RESTORE) {
log.info("checkpoints are enabled, period is " + CHECKPOINT_INTERVAL_MS);
}
-
+
File initialAdaptors = null;
- if(conf.get("chukwaAgent.initial_adaptors") != null)
- initialAdaptors= new File( conf.get("chukwaAgent.initial_adaptors"));
-
+ if (conf.get("chukwaAgent.initial_adaptors") != null)
+ initialAdaptors = new File(conf.get("chukwaAgent.initial_adaptors"));
+
try {
if (DO_CHECKPOINT_RESTORE) {
restoreFromCheckpoint();
@@ -235,35 +219,35 @@
}
try {
- if (initialAdaptors != null && initialAdaptors.exists() && checkpointNumber ==0)
- readAdaptorsFile(initialAdaptors); //don't read after checkpoint restore
+ if (initialAdaptors != null && initialAdaptors.exists()
+ && checkpointNumber == 0)
+ readAdaptorsFile(initialAdaptors); // don't read after checkpoint
+ // restore
} catch (IOException e) {
log.warn("couldn't read user-specified file "
+ initialAdaptors.getAbsolutePath());
}
controlSock = new AgentControlSocketListener(this);
- try
- {
+ try {
controlSock.tryToBind(); // do this synchronously; if it fails, we know
// another agent is running.
controlSock.start(); // this sets us up as a daemon
log.info("control socket started on port " + controlSock.portno);
-
- //shouldn't start checkpointing until we're finishing launching
- //adaptors on boot
- if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir!= null) {
+ // shouldn't start checkpointing until we're finishing launching
+ // adaptors on boot
+ if (CHECKPOINT_INTERVAL_MS > 0 && checkpointDir != null) {
checkpointer = new Timer();
checkpointer.schedule(new CheckpointTask(), 0, CHECKPOINT_INTERVAL_MS);
}
- } catch (IOException e)
- {
+ } catch (IOException e) {
log.info("failed to bind to socket; aborting agent launch", e);
throw new AlreadyRunningException();
}
}
+
// words should contain (space delimited):
// 0) command ("add")
// 1) AdaptorClassname
@@ -273,35 +257,37 @@
// but can be arbitrarily many space
// delimited agent specific params )
// 4) offset
- Pattern addCmdPattern = Pattern.compile(
- "[aA][dD][dD]\\s+" //command "add", any case, plus at least one space
- + "(\\S+)\\s+" //the adaptor classname, plus at least one space
- + "(\\S+)\\s+" //datatype, plus at least one space
- + "(?:" //start a non-capturing group, for the parameters
- + "(.*?)\\s+" //capture the actual parameters reluctantly, followed by whitespace
- + ")?" //end non-matching group for params; group is optional
- + "(\\d+)\\s*"); // finally, an offset and some trailing whitespace
+ Pattern addCmdPattern = Pattern.compile("[aA][dD][dD]\\s+" // command "add",
+ // any case, plus
+ // at least one
+ // space
+ + "(\\S+)\\s+" // the adaptor classname, plus at least one space
+ + "(\\S+)\\s+" // datatype, plus at least one space
+ + "(?:" // start a non-capturing group, for the parameters
+ + "(.*?)\\s+" // capture the actual parameters reluctantly, followed by
+ // whitespace
+ + ")?" // end non-matching group for params; group is optional
+ + "(\\d+)\\s*"); // finally, an offset and some trailing whitespace
- public long processCommand(String cmd)
- {
+ public long processCommand(String cmd) {
Matcher m = addCmdPattern.matcher(cmd);
if (m.matches()) {
- long offset; //check for obvious errors first
+ long offset; // check for obvious errors first
try {
offset = Long.parseLong(m.group(4));
- } catch (NumberFormatException e) {
+ } catch (NumberFormatException e) {
log.warn("malformed line " + cmd);
return -1L;
}
-
+
String adaptorName = m.group(1);
String dataType = m.group(2);
String params = m.group(3);
- if(params == null)
+ if (params == null)
params = "";
Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
- if (adaptor == null) {
+ if (adaptor == null) {
log.warn("Error creating adaptor from adaptor name " + adaptorName);
return -1L;
}
@@ -318,24 +304,21 @@
adaptorsByNumber.put(adaptorID, adaptor);
adaptorPositions.put(adaptor, new Offset(offset, adaptorID));
needNewCheckpoint = true;
- try
- {
+ try {
adaptor.start(adaptorID, dataType, params, offset, DataFactory
.getInstance().getEventQueue());
log.info("started a new adaptor, id = " + adaptorID);
return adaptorID;
- } catch (Exception e)
- {
+ } catch (Exception e) {
log.warn("failed to start adaptor", e);
// FIXME: don't we need to clean up the adaptor maps here?
}
}
- } else
- if(cmd.length() > 0)
- log.warn("only 'add' command supported in config files; cmd was: "+ cmd);
- //no warning for blank line
-
+ } else if (cmd.length() > 0)
+ log.warn("only 'add' command supported in config files; cmd was: " + cmd);
+ // no warning for blank line
+
return -1;
}
@@ -351,24 +334,19 @@
* @return true if the restore succeeded
* @throws IOException
*/
- public boolean restoreFromCheckpoint() throws IOException
- {
- synchronized (checkpointDir)
- {
- String[] checkpointNames = checkpointDir.list(new FilenameFilter()
- {
- public boolean accept(File dir, String name)
- {
+ public boolean restoreFromCheckpoint() throws IOException {
+ synchronized (checkpointDir) {
+ String[] checkpointNames = checkpointDir.list(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
return name.startsWith(CHECKPOINT_BASE_NAME);
}
});
-
+
if (checkpointNames == null) {
log.error("Unable to list files in checkpoint dir");
return false;
}
- if (checkpointNames.length == 0)
- {
+ if (checkpointNames.length == 0) {
log.info("No checkpoints found in " + checkpointDir);
return false;
}
@@ -381,18 +359,16 @@
String lowestName = null;
int lowestIndex = Integer.MAX_VALUE;
- for (String n : checkpointNames)
- {
+ for (String n : checkpointNames) {
int index = Integer
.parseInt(n.substring(CHECKPOINT_BASE_NAME.length()));
- if (index < lowestIndex)
- {
+ if (index < lowestIndex) {
lowestName = n;
lowestIndex = index;
}
}
- checkpointNumber = lowestIndex+1;
+ checkpointNumber = lowestIndex + 1;
File checkpoint = new File(checkpointDir, lowestName);
readAdaptorsFile(checkpoint);
}
@@ -400,8 +376,7 @@
}
private void readAdaptorsFile(File checkpoint) throws FileNotFoundException,
- IOException
- {
+ IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(
new FileInputStream(checkpoint)));
String cmd = null;
@@ -448,17 +423,15 @@
public void reportCommit(Adaptor src, long uuid) {
needNewCheckpoint = true;
Offset o = adaptorPositions.get(src);
- if (o != null)
- {
- synchronized (o)
- { // order writes to offset, in case commits are processed out of order
+ if (o != null) {
+ synchronized (o) { // order writes to offset, in case commits are
+ // processed out of order
if (uuid > o.offset)
o.offset = uuid;
}
log.info("got commit up to " + uuid + " on " + src + " = " + o.id);
- } else
- {
+ } else {
log.warn("got commit up to " + uuid + " for adaptor " + src
+ " that doesn't appear to be running: " + adaptorsByNumber.size()
+ " total");
@@ -466,16 +439,12 @@
}
class CheckpointTask extends TimerTask {
- public void run()
- {
- try
- {
- if (needNewCheckpoint)
- {
+ public void run() {
+ try {
+ if (needNewCheckpoint) {
writeCheckpoint();
}
- } catch (IOException e)
- {
+ } catch (IOException e) {
log.warn("failed to write checkpoint", e);
}
}
@@ -494,10 +463,8 @@
* If the adaptor is written correctly, its offset won't change after
* returning from shutdown.
*
- * @param number
- * the adaptor to stop
- * @param gracefully
- * if true, shutdown, if false, hardStop
+ * @param number the adaptor to stop
+ * @param gracefully if true, shutdown, if false, hardStop
* @return the number of bytes synched at stop. -1 on error
*/
public long stopAdaptor(long number, boolean gracefully) {
@@ -516,19 +483,21 @@
} else {
adaptorPositions.remove(toStop);
}
-
- try {
+
+ try {
if (gracefully) {
- offset = toStop.shutdown();
- log.info("shutdown on adaptor: " + number + ", " + toStop.getCurrentStatus());
- } else {
+ offset = toStop.shutdown();
+ log.info("shutdown on adaptor: " + number + ", "
+ + toStop.getCurrentStatus());
+ } else {
toStop.hardStop();
- log.info("hardStop on adaptorId: " + number + ", " + toStop.getCurrentStatus());
+ log.info("hardStop on adaptorId: " + number + ", "
+ + toStop.getCurrentStatus());
}
} catch (AdaptorException e) {
log.error("adaptor failed to stop cleanly", e);
} finally {
- needNewCheckpoint = true;
+ needNewCheckpoint = true;
}
return offset;
}
@@ -558,13 +527,15 @@
chukwaConf = new File(chukwaConfName).getAbsoluteFile();
else
chukwaConf = new File(chukwaHome, "conf");
-
+
log.info("Config - CHUKWA_CONF_DIR: [" + chukwaConf.toString() + "]");
- File agentConf = new File(chukwaConf,"chukwa-agent-conf.xml");
+ File agentConf = new File(chukwaConf, "chukwa-agent-conf.xml");
conf.addResource(new Path(agentConf.getAbsolutePath()));
- if(conf.get("chukwaAgent.checkpoint.dir") == null)
- conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var").getAbsolutePath());
- conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf, "initial_adaptors").getAbsolutePath());
+ if (conf.get("chukwaAgent.checkpoint.dir") == null)
+ conf.set("chukwaAgent.checkpoint.dir", new File(chukwaHome, "var")
+ .getAbsolutePath());
+ conf.set("chukwaAgent.initial_adaptors", new File(chukwaConf,
+ "initial_adaptors").getAbsolutePath());
return conf;
}
@@ -581,20 +552,19 @@
if (checkpointer != null) {
checkpointer.cancel();
try {
- if(needNewCheckpoint)
+ if (needNewCheckpoint)
writeCheckpoint(); // write a last checkpoint here, before stopping
} catch (IOException e) {
}
}
- // adaptors
+ // adaptors
- synchronized (adaptorsByNumber) {
+ synchronized (adaptorsByNumber) {
// shut down each adaptor
for (Adaptor a : adaptorsByNumber.values()) {
try {
a.hardStop();
- } catch (AdaptorException e)
- {
+ } catch (AdaptorException e) {
log.warn("failed to cleanly stop " + a, e);
}
}
@@ -608,8 +578,7 @@
/**
* Returns the last offset at which a given adaptor was checkpointed
*
- * @param a
- * the adaptor in question
+ * @param a the adaptor in question
* @return that adaptor's last-checkpointed offset
*/
public long getOffset(Adaptor a) {
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Wed Mar 11 22:39:26 2009
@@ -18,10 +18,10 @@
package org.apache.hadoop.chukwa.datacollection.agent;
+
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.log4j.Logger;
@@ -29,75 +29,71 @@
/**
* An event queue that blocks once a fixed upper limit of data is enqueued.
*
- * For now, uses the size of the data field. Should really use estimatedSerializedSize()?
+ * For now, uses the size of the data field. Should really use
+ * estimatedSerializedSize()?
*
*/
-public class MemLimitQueue implements ChunkQueue
-{
- static Logger log = Logger.getLogger(WaitingQueue.class);
-
- private Queue<Chunk> queue = new LinkedList<Chunk>();
- private long dataSize = 0;
- private final long MAX_MEM_USAGE;
+public class MemLimitQueue implements ChunkQueue {
+ static Logger log = Logger.getLogger(WaitingQueue.class);
+ private Queue<Chunk> queue = new LinkedList<Chunk>();
+ private long dataSize = 0;
+ private final long MAX_MEM_USAGE;
-
public MemLimitQueue(int limit) {
MAX_MEM_USAGE = limit;
}
-
- /**
- * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
- */
- public void add(Chunk chunk) throws InterruptedException
- {
- assert chunk != null: "can't enqueue null chunks";
- synchronized(this) {
- while(chunk.getData().length + dataSize > MAX_MEM_USAGE)
- {
- try
- {
- this.wait();
- log.info("MemLimitQueue is full [" + dataSize +"]");
- }
- catch(InterruptedException e) {}
+
+ /**
+ * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
+ */
+ public void add(Chunk chunk) throws InterruptedException {
+ assert chunk != null : "can't enqueue null chunks";
+ synchronized (this) {
+ while (chunk.getData().length + dataSize > MAX_MEM_USAGE) {
+ try {
+ this.wait();
+ log.info("MemLimitQueue is full [" + dataSize + "]");
+ } catch (InterruptedException e) {
+ }
}
dataSize += chunk.getData().length;
queue.add(chunk);
this.notifyAll();
}
-
- }
- /**
- * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List, int)
- */
- public void collect(List<Chunk> events,int maxSize) throws InterruptedException
- {
- synchronized(this) {
- //we can't just say queue.take() here, since we're holding a lock.
- while(queue.isEmpty()){
- this.wait();
- }
-
-
- int size = 0;
- while(!queue.isEmpty() && (size < maxSize)) {
- Chunk e = this.queue.remove();
- int chunkSize = e.getData().length;
- size += chunkSize;
- dataSize -= chunkSize;
- events.add(e);
- }
- this.notifyAll();
- }
-
- if (log.isDebugEnabled()) {
- log.debug("WaitingQueue.inQueueCount:" + queue.size() + "\tWaitingQueue.collectCount:" + events.size());
- }
- }
-
- public int size(){
- return queue.size();
- }
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List,
+ * int)
+ */
+ public void collect(List<Chunk> events, int maxSize)
+ throws InterruptedException {
+ synchronized (this) {
+ // we can't just say queue.take() here, since we're holding a lock.
+ while (queue.isEmpty()) {
+ this.wait();
+ }
+
+ int size = 0;
+ while (!queue.isEmpty() && (size < maxSize)) {
+ Chunk e = this.queue.remove();
+ int chunkSize = e.getData().length;
+ size += chunkSize;
+ dataSize -= chunkSize;
+ events.add(e);
+ }
+ this.notifyAll();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("WaitingQueue.inQueueCount:" + queue.size()
+ + "\tWaitingQueue.collectCount:" + events.size());
+ }
+ }
+
+ public int size() {
+ return queue.size();
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/WaitingQueue.java Wed Mar 11 22:39:26 2009
@@ -18,57 +18,50 @@
package org.apache.hadoop.chukwa.datacollection.agent;
+
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.log4j.Logger;
-public class WaitingQueue implements ChunkQueue
-{
+public class WaitingQueue implements ChunkQueue {
+
+ static Logger log = Logger.getLogger(WaitingQueue.class);
+ private BlockingQueue<Chunk> queue = new LinkedBlockingQueue<Chunk>(5);
- static Logger log = Logger.getLogger(WaitingQueue.class);
- private BlockingQueue<Chunk> queue = new LinkedBlockingQueue<Chunk>(5);
-
- public void add(Chunk event)
- {
- try
- {
- this.queue.put(event);
- }
- catch(InterruptedException e)
- {}//return upwards
- }
-
- public void add(List<Chunk> events)
- {
- this.queue.addAll(events);
-
- }
-
- public void collect(List<Chunk> events,int maxCount)
- {
- // Workaround to block on the queue
- try
- {
- events.add(this.queue.take());
- }
- catch (InterruptedException e)
- {}
- this.queue.drainTo(events,maxCount-1);
-
- System.out.println("collect [" + Thread.currentThread().getName() + "] [" + events.size() + "]");
-
- if (log.isDebugEnabled())
- {
- log.debug("WaitingQueue.inQueueCount:" + queue.size() + "\tWaitingQueue.collectCount:" + events.size());
- }
- }
-
- public int size(){
- return queue.size();
- }
+ public void add(Chunk event) {
+ try {
+ this.queue.put(event);
+ } catch (InterruptedException e) {
+ }// return upwards
+ }
+
+ public void add(List<Chunk> events) {
+ this.queue.addAll(events);
+
+ }
+
+ public void collect(List<Chunk> events, int maxCount) {
+ // Workaround to block on the queue
+ try {
+ events.add(this.queue.take());
+ } catch (InterruptedException e) {
+ }
+ this.queue.drainTo(events, maxCount - 1);
+
+ System.out.println("collect [" + Thread.currentThread().getName() + "] ["
+ + events.size() + "]");
+
+ if (log.isDebugEnabled()) {
+ log.debug("WaitingQueue.inQueueCount:" + queue.size()
+ + "\tWaitingQueue.collectCount:" + events.size());
+ }
+ }
+
+ public int size() {
+ return queue.size();
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java Wed Mar 11 22:39:26 2009
@@ -18,6 +18,7 @@
package org.apache.hadoop.chukwa.datacollection.collector;
+
import org.mortbay.jetty.*;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.servlet.*;
@@ -27,74 +28,73 @@
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
public class CollectorStub {
-
+
static int THREADS = 80;
private static PidFile pFile = null;
public static Server jettyServer = null;
+
public static void main(String[] args) {
-
- pFile=new PidFile("Collector");
- Runtime.getRuntime().addShutdownHook(pFile);
+
+ pFile = new PidFile("Collector");
+ Runtime.getRuntime().addShutdownHook(pFile);
try {
- if(args.length>=1 && args[0].equalsIgnoreCase("-help")) {
+ if (args.length >= 1 && args[0].equalsIgnoreCase("-help")) {
System.out.println("usage: CollectorStub [portno] [pretend]");
- System.out.println("note: if no portno defined, " +
- "defaults to value in chukwa-site.xml");
+ System.out.println("note: if no portno defined, "
+ + "defaults to value in chukwa-site.xml");
System.exit(0);
}
-
+
ChukwaConfiguration conf = new ChukwaConfiguration();
int portNum = conf.getInt("chukwaCollector.http.port", 9999);
THREADS = conf.getInt("chukwaCollector.http.threads", 80);
-
- if(args.length != 0)
+
+ if (args.length != 0)
portNum = Integer.parseInt(args[0]);
-
- //pick a writer.
+
+ // pick a writer.
ChukwaWriter w = null;
- if(args.length > 1) {
- if(args[1].equals("pretend"))
- w= new ConsoleWriter(true);
- else if(args[1].equals("pretend-quietly"))
+ if (args.length > 1) {
+ if (args[1].equals("pretend"))
+ w = new ConsoleWriter(true);
+ else if (args[1].equals("pretend-quietly"))
w = new ConsoleWriter(false);
- else if(args[1].equals("-classname")) {
- if(args.length < 3)
+ else if (args[1].equals("-classname")) {
+ if (args.length < 3)
System.err.println("need to specify a writer class");
else {
conf.set("chukwaCollector.writerClass", args[2]);
}
- }
- else
- System.out.println("WARNING: unknown command line arg "+ args[1]);
+ } else
+ System.out.println("WARNING: unknown command line arg " + args[1]);
}
- if(w != null) {
+ if (w != null) {
w.init(conf);
ServletCollector.setWriter(w);
}
-
- //set up jetty connector
+
+ // set up jetty connector
SelectChannelConnector jettyConnector = new SelectChannelConnector();
- jettyConnector.setLowResourcesConnections(THREADS-10);
+ jettyConnector.setLowResourcesConnections(THREADS - 10);
jettyConnector.setLowResourceMaxIdleTime(1500);
jettyConnector.setPort(portNum);
- //set up jetty server
+ // set up jetty server
jettyServer = new Server(portNum);
-
- jettyServer.setConnectors(new Connector[]{ jettyConnector});
- org.mortbay.thread.BoundedThreadPool pool =
- new org.mortbay.thread.BoundedThreadPool();
+
+ jettyServer.setConnectors(new Connector[] { jettyConnector });
+ org.mortbay.thread.BoundedThreadPool pool = new org.mortbay.thread.BoundedThreadPool();
pool.setMaxThreads(THREADS);
jettyServer.setThreadPool(pool);
- //and add the servlet to it
- Context root = new Context(jettyServer,"/",Context.SESSIONS);
+ // and add the servlet to it
+ Context root = new Context(jettyServer, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
jettyServer.start();
jettyServer.setStopAtShutdown(false);
-
+
System.out.println("started http collector on port number " + portNum);
- } catch(Exception e) {
- e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
System.exit(0);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=752666&r1=752665&r2=752666&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Wed Mar 11 22:39:26 2009
@@ -18,219 +18,205 @@
package org.apache.hadoop.chukwa.datacollection.collector.servlet;
+
import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
-
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.chukwa.datacollection.writer.*;
import org.apache.log4j.Logger;
-public class ServletCollector extends HttpServlet
-{
+public class ServletCollector extends HttpServlet {
static final boolean FANCY_DIAGNOSTICS = false;
- static org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
-
+ static org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null;
+
private static final long serialVersionUID = 6286162898591407111L;
- Logger log = Logger.getRootLogger();//.getLogger(ServletCollector.class);
-
- public static void setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws WriterException
- {
- writer = w;
- }
- static long statTime = 0L;
- static int numberHTTPConnection = 0;
- static int numberchunks = 0;
- static long lifetimechunks=0;
-
- Configuration conf;
-
+ Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class);
+
+ public static void setWriter(
+ org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w)
+ throws WriterException {
+ writer = w;
+ }
+
+ static long statTime = 0L;
+ static int numberHTTPConnection = 0;
+ static int numberchunks = 0;
+ static long lifetimechunks = 0;
+
+ Configuration conf;
+
public ServletCollector(Configuration c) {
- conf =c;
+ conf = c;
}
-
- public void init(ServletConfig servletConf) throws ServletException
- {
-
- log.info("initing servletCollector");
- if(servletConf == null) {
- log.fatal("no servlet config");
- return;
- }
-
- Timer statTimer = new Timer();
- statTimer.schedule(new TimerTask()
- {
- public void run()
- {
- log.info("stats:ServletCollector,numberHTTPConnection:" + numberHTTPConnection
- + ",numberchunks:"+numberchunks);
- statTime = System.currentTimeMillis();
- numberHTTPConnection = 0;
- numberchunks = 0;
- }
- }, (1000), (60*1000));
-
- if(writer != null) {
- log.info("writer set up statically, no need for Collector.init() to do it");
- return;
- }
-
- try {
- String writerClassName = conf.get("chukwaCollector.writerClass",
- SeqFileWriter.class.getCanonicalName());
- Class<?> writerClass = Class.forName(writerClassName);
- if(writerClass != null &&ChukwaWriter.class.isAssignableFrom(writerClass))
- writer = (ChukwaWriter) writerClass.newInstance();
- } catch(Exception e) {
- log.warn("failed to use user-chosen writer class, defaulting to SeqFileWriter", e);
- }
-
- //We default to here if the pipeline construction failed or didn't happen.
- try{
- if(writer == null)
- writer = new SeqFileWriter();//default to SeqFileWriter
+ public void init(ServletConfig servletConf) throws ServletException {
+
+ log.info("initing servletCollector");
+ if (servletConf == null) {
+ log.fatal("no servlet config");
+ return;
+ }
+
+ Timer statTimer = new Timer();
+ statTimer.schedule(new TimerTask() {
+ public void run() {
+ log.info("stats:ServletCollector,numberHTTPConnection:"
+ + numberHTTPConnection + ",numberchunks:" + numberchunks);
+ statTime = System.currentTimeMillis();
+ numberHTTPConnection = 0;
+ numberchunks = 0;
+ }
+ }, (1000), (60 * 1000));
+
+ if (writer != null) {
+ log
+ .info("writer set up statically, no need for Collector.init() to do it");
+ return;
+ }
+
+ try {
+ String writerClassName = conf.get("chukwaCollector.writerClass",
+ SeqFileWriter.class.getCanonicalName());
+ Class<?> writerClass = Class.forName(writerClassName);
+ if (writerClass != null
+ && ChukwaWriter.class.isAssignableFrom(writerClass))
+ writer = (ChukwaWriter) writerClass.newInstance();
+ } catch (Exception e) {
+ log
+ .warn(
+ "failed to use user-chosen writer class, defaulting to SeqFileWriter",
+ e);
+ }
+
+ // We default to here if the pipeline construction failed or didn't happen.
+ try {
+ if (writer == null)
+ writer = new SeqFileWriter();// default to SeqFileWriter
writer.init(conf);
- } catch (WriterException e) {
- throw new ServletException("Problem init-ing servlet", e);
- }
- }
-
- protected void accept(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException
- {
- numberHTTPConnection ++;
- ServletDiagnostics diagnosticPage = new ServletDiagnostics();
- final long currentTime = System.currentTimeMillis();
- try {
-
- log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
- java.io.InputStream in = req.getInputStream();
-
- ServletOutputStream l_out = resp.getOutputStream();
- final DataInputStream di = new DataInputStream(in);
- final int numEvents = di.readInt();
- // log.info("saw " + numEvents+ " in request");
-
- if(FANCY_DIAGNOSTICS)
- { diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime); }
-
- List<Chunk> events = new LinkedList<Chunk>();
- ChunkImpl logEvent = null;
- StringBuilder sb = new StringBuilder();
-
- for (int i = 0; i < numEvents; i++)
- {
- // TODO: pass new data to all registered stream handler
- // methods for this chunk's stream
- // TODO: should really have some dynamic assignment of events to writers
-
- logEvent = ChunkImpl.read(di);
- sb.append("ok:");
- sb.append(logEvent.getData().length);
- sb.append(" bytes ending at offset ");
- sb.append(logEvent.getSeqID()-1).append("\n");
-
- events.add(logEvent);
-
- if(FANCY_DIAGNOSTICS)
- { diagnosticPage.sawChunk(logEvent, i); }
- }
-
- // write new data to data sync file
- if(writer != null)
- {
- writer.add(events);
- numberchunks += events.size();
- lifetimechunks += events.size();
- //this is where we ACK this connection
- l_out.print(sb.toString());
- }
- else
- {
- l_out.println("can't write: no writer");
- }
-
-
- if(FANCY_DIAGNOSTICS)
- { diagnosticPage.doneWithPost(); }
-
- resp.setStatus(200);
-
- }
- catch(Throwable e)
- {
- log.warn("Exception talking to " +req.getRemoteHost() + " at " + currentTime , e);
- throw new ServletException(e);
- }
- }
-
-
- @Override
- protected void doPost(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException
- {
- accept(req,resp);
- }
-
- @Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp)
- throws ServletException, IOException
- {
-
- PrintStream out = new PrintStream(resp.getOutputStream());
- resp.setStatus(200);
-
- String pingAtt = req.getParameter("ping");
- if (pingAtt!=null)
- {
- out.println("Date:" + ServletCollector.statTime);
- out.println("Now:" + System.currentTimeMillis());
- out.println("numberHTTPConnection:" + ServletCollector.numberHTTPConnection);
- out.println("numberchunks:" + ServletCollector.numberchunks);
+ } catch (WriterException e) {
+ throw new ServletException("Problem init-ing servlet", e);
+ }
+ }
+
+ protected void accept(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException {
+ numberHTTPConnection++;
+ ServletDiagnostics diagnosticPage = new ServletDiagnostics();
+ final long currentTime = System.currentTimeMillis();
+ try {
+
+ log.debug("new post from " + req.getRemoteHost() + " at " + currentTime);
+ java.io.InputStream in = req.getInputStream();
+
+ ServletOutputStream l_out = resp.getOutputStream();
+ final DataInputStream di = new DataInputStream(in);
+ final int numEvents = di.readInt();
+ // log.info("saw " + numEvents+ " in request");
+
+ if (FANCY_DIAGNOSTICS) {
+ diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime);
+ }
+
+ List<Chunk> events = new LinkedList<Chunk>();
+ ChunkImpl logEvent = null;
+ StringBuilder sb = new StringBuilder();
+
+ for (int i = 0; i < numEvents; i++) {
+ // TODO: pass new data to all registered stream handler
+ // methods for this chunk's stream
+ // TODO: should really have some dynamic assignment of events to writers
+
+ logEvent = ChunkImpl.read(di);
+ sb.append("ok:");
+ sb.append(logEvent.getData().length);
+ sb.append(" bytes ending at offset ");
+ sb.append(logEvent.getSeqID() - 1).append("\n");
+
+ events.add(logEvent);
+
+ if (FANCY_DIAGNOSTICS) {
+ diagnosticPage.sawChunk(logEvent, i);
+ }
+ }
+
+ // write new data to data sync file
+ if (writer != null) {
+ writer.add(events);
+ numberchunks += events.size();
+ lifetimechunks += events.size();
+ // this is where we ACK this connection
+ l_out.print(sb.toString());
+ } else {
+ l_out.println("can't write: no writer");
+ }
+
+ if (FANCY_DIAGNOSTICS) {
+ diagnosticPage.doneWithPost();
+ }
+
+ resp.setStatus(200);
+
+ } catch (Throwable e) {
+ log.warn("Exception talking to " + req.getRemoteHost() + " at "
+ + currentTime, e);
+ throw new ServletException(e);
+ }
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ accept(req, resp);
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+
+ PrintStream out = new PrintStream(resp.getOutputStream());
+ resp.setStatus(200);
+
+ String pingAtt = req.getParameter("ping");
+ if (pingAtt != null) {
+ out.println("Date:" + ServletCollector.statTime);
+ out.println("Now:" + System.currentTimeMillis());
+ out.println("numberHTTPConnection:"
+ + ServletCollector.numberHTTPConnection);
+ out.println("numberchunks:" + ServletCollector.numberchunks);
out.println("lifetimechunks:" + ServletCollector.lifetimechunks);
- }
- else
- {
- out.println("<html><body><h2>Chukwa servlet running</h2>");
- if(FANCY_DIAGNOSTICS)
- ServletDiagnostics.printPage(out);
- out.println("</body></html>");
- }
-
-
- }
-
- @Override
- public String getServletInfo()
- {
- return "Chukwa Servlet Collector";
- }
-
- @Override
- public void destroy()
- {
- try
- {
- writer.close();
- } catch (WriterException e)
- {
- log.warn("Exception during close", e);
- e.printStackTrace();
- }
- super.destroy();
- }
+ } else {
+ out.println("<html><body><h2>Chukwa servlet running</h2>");
+ if (FANCY_DIAGNOSTICS)
+ ServletDiagnostics.printPage(out);
+ out.println("</body></html>");
+ }
+
+ }
+
+ @Override
+ public String getServletInfo() {
+ return "Chukwa Servlet Collector";
+ }
+
+ @Override
+ public void destroy() {
+ try {
+ writer.close();
+ } catch (WriterException e) {
+ log.warn("Exception during close", e);
+ e.printStackTrace();
+ }
+ super.destroy();
+ }
}