You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/21 22:05:18 UTC
svn commit: r892976 - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/datacollection/adaptor/
src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/
src/java/org/apache/hadoop/chukwa/datacollection/agent/
src/java/org/apache...
Author: asrabkin
Date: Mon Dec 21 21:05:17 2009
New Revision: 892976
URL: http://svn.apache.org/viewvc?rev=892976&view=rev
Log:
CHUKWA-421. Use modification time to detect rotation.
Added:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Dec 21 21:05:17 2009
@@ -18,6 +18,8 @@
IMPROVEMENTS
+ CHUKWA-421. Use modification time to detect rotation. (asrabkin)
+
CHUKWA-432. PipelineableWriter becomes an abstract class. (asrabkin)
CHUKWA-429. Update HDFS heatmap color with rainbow colors. (Eric Yang)
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java Mon Dec 21 21:05:17 2009
@@ -35,11 +35,10 @@
@Override
public final void start(String adaptorID, String type, long offset,
- ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+ ChunkReceiver dest) throws AdaptorException {
this.adaptorID = adaptorID;
this.type = type;
this.dest=dest;
- control = c;
start(offset);
}
@@ -50,7 +49,8 @@
control.stopAdaptor(adaptorID, gracefully);
}
- public String parseArgs(String d, String s) {
+ public String parseArgs(String d, String s, AdaptorManager c) {
+ control = c;
return parseArgs(s);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java Mon Dec 21 21:05:17 2009
@@ -13,6 +13,7 @@
String innerClassName;
String innerType;
ChunkReceiver dest;
+ AdaptorManager manager;
@Override
public String getCurrentStatus() {
@@ -30,14 +31,15 @@
* Note that the name of the inner class will get parsed out as a type
*/
@Override
- public String parseArgs(String innerClassName, String params) {
+ public String parseArgs(String innerClassName, String params, AdaptorManager a) {
+ manager = a;
Matcher m = p.matcher(params);
this.innerClassName = innerClassName;
String innerCoreParams;
if(m.matches()) {
innerType = m.group(1);
inner = AdaptorFactory.createAdaptor(innerClassName);
- innerCoreParams = inner.parseArgs(innerType,m.group(2));
+ innerCoreParams = inner.parseArgs(innerType,m.group(2),a);
return innerClassName + innerCoreParams;
}
else return null;
@@ -64,10 +66,10 @@
*/
@Override
public void start(String adaptorID, String type, long offset,
- ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+ ChunkReceiver dest) throws AdaptorException {
String dummyAdaptorID = adaptorID;
this.dest = dest;
- inner.start(dummyAdaptorID, type, offset, this, c);
+ inner.start(dummyAdaptorID, type, offset, this);
}
@Override
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Mon Dec 21 21:05:17 2009
@@ -55,7 +55,7 @@
* @throws AdaptorException
*/
public void start(String adaptorID, String type, long offset,
- ChunkReceiver dest, AdaptorManager c) throws AdaptorException;
+ ChunkReceiver dest) throws AdaptorException;
/**
* Return the adaptor's state Should not include class name or byte
@@ -77,7 +77,7 @@
*
* @return Stream name as a string, null if params are malformed
*/
- public String parseArgs(String datatype, String params);
+ public String parseArgs(String datatype, String params, AdaptorManager c);
/**
* Signals this adaptor to come to an orderly stop. The adaptor ought to push
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java Mon Dec 21 21:05:17 2009
@@ -63,7 +63,7 @@
@Override
public void start(String adaptorID, String type, long offset,
- ChunkReceiver dest, AdaptorManager manager) throws AdaptorException {
+ ChunkReceiver dest) throws AdaptorException {
try {
String dummyAdaptorID = adaptorID;
this.dest = dest;
@@ -81,7 +81,7 @@
for(Chunk c:myBuffer.chunks)
dest.add(c);
- inner.start(dummyAdaptorID, innerType, offset, this, manager);
+ inner.start(dummyAdaptorID, innerType, offset, this);
} catch(InterruptedException e) {
throw new AdaptorException(e);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Mon Dec 21 21:05:17 2009
@@ -48,21 +48,14 @@
*/
int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 2;
int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
- private static Configuration conf = null;
+// private Configuration conf = null;
public static final int MAX_SAMPLE_PERIOD = 60 * 1000;
- FileTailer() {
- if (conf == null) {
- ChukwaAgent agent = ChukwaAgent.getAgent();
- if (agent != null) {
- conf = agent.getConfiguration();
- if (conf != null) {
- SAMPLE_PERIOD_MS = conf.getInt(
- "chukwaAgent.adaptor.context.switch.time",
- DEFAULT_SAMPLE_PERIOD_MS);
- }
- }
- }
+ FileTailer(Configuration conf) {
+ // this.conf = conf;
+ SAMPLE_PERIOD_MS = conf.getInt(
+ "chukwaAgent.adaptor.context.switch.time",
+ DEFAULT_SAMPLE_PERIOD_MS);
eq = DataFactory.getInstance().getEventQueue();
// iterations are much more common than adding a new adaptor
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java Mon Dec 21 21:05:17 2009
@@ -57,7 +57,7 @@
protected static FileTailer tailer;
static {
- tailer = new FileTailer();
+ tailer = null;
log = Logger.getLogger(FileTailingAdaptor.class);
}
@@ -77,9 +77,11 @@
@Override
public void start(long offset) {
- conf = control.getConfiguration();
- MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
- this.fileReadOffset = offset;
+ synchronized(LWFTAdaptor.class) {
+ if(tailer == null)
+ tailer = new FileTailer(control.getConfiguration());
+ }
+ this.fileReadOffset = offset - offsetOfFirstByte;
tailer.startWatchingFile(this);
}
@@ -97,12 +99,15 @@
public String getStreamName() {
return toWatch.getPath();
}
-
+
@Override
public String parseArgs(String params) {
+ conf = control.getConfiguration();
+ MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
+
Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
Matcher m = cmd.matcher(params);
- if (m.matches()) {
+ if (m.matches()) { //check for first-byte offset. If absent, assume we just got a path.
offsetOfFirstByte = Long.parseLong(m.group(1));
toWatch = new File(m.group(2));
} else {
@@ -228,6 +233,7 @@
}
private void handleShrunkenFile(long measuredLen) {
+ log.info("file "+ toWatch +"shrank from " + fileReadOffset + " to " + measuredLen);
offsetOfFirstByte = measuredLen;
fileReadOffset = 0;
}
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java?rev=892976&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java Mon Dec 21 21:05:17 2009
@@ -0,0 +1,164 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.LinkedList;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+
+/**
+ * Checkpoint state:
+ * date modified of most-recently tailed file, offset of first byte of that file,
+ * then regular FTA arts
+ *
+ */
+public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
+
+ private static class FPair implements Comparable<FPair> {
+ File f;
+ long mod;
+ FPair(File f) {
+ this.f = f;
+ mod = f.lastModified();
+ }
+ /**
+ * -1 implies this is LESS THAN o
+ */
+ @Override
+ public int compareTo(FPair o) {
+ if(mod < o.mod)
+ return -1;
+ else if (mod > o.mod)
+ return 1;
+ //want toWatch to be last
+ else return (o.f.getName().compareTo(f.getName()));//shouldn't happen?
+ }
+ }
+
+ long prevFileLastModDate = 0;
+ LinkedList<FPair> fileQ = new LinkedList<FPair>();
+ String fBaseName;
+ File cur; //this is the actual physical file being watched.
+ // in contrast, toWatch is the path specified by the user
+ boolean caughtUp = false;
+ /**
+ * Check for date-modified and offset; if absent assume we just got a name.
+ */
+ @Override
+ public String parseArgs(String params) {
+ Pattern cmd = Pattern.compile("d:(\\d+)\\s+(\\d+)\\s+(.+)\\s?");
+ Matcher m = cmd.matcher(params);
+ if (m.matches()) {
+ prevFileLastModDate = Long.parseLong(m.group(1));
+ offsetOfFirstByte = Long.parseLong(m.group(2));
+ toWatch = new File(m.group(3)).getAbsoluteFile();
+ } else {
+ toWatch = new File(params.trim()).getAbsoluteFile();
+ }
+ fBaseName = toWatch.getName();
+ return toWatch.getAbsolutePath();
+ }
+
+ public String getCurrentStatus() {
+ return type.trim() + " d:" + prevFileLastModDate + " " + offsetOfFirstByte + " " + toWatch.getPath();
+ }
+
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith(fBaseName) &&
+ ( pathname.getName().equals(fBaseName) ||
+ pathname.lastModified() > prevFileLastModDate);
+ }
+
+
+ protected void mkFileQ() {
+
+ File toWatchDir = toWatch.getParentFile();
+ File[] candidates = toWatchDir.listFiles(this);
+ if(candidates == null) {
+ log.error(toWatchDir + " is not a directory");
+ } else {
+ log.debug("saw " + candidates.length + " files matching pattern");
+ fileQ = new LinkedList<FPair>();
+ for(File f:candidates)
+ fileQ.add(new FPair(f));
+ Collections.sort(fileQ);
+ }
+ }
+
+ protected void advanceQ() {
+ FPair next = fileQ.poll();
+ if(next != null) {
+ cur = next.f;
+ caughtUp = toWatch.equals(cur);
+ if(caughtUp && !fileQ.isEmpty())
+ log.warn("expected rotation queue to be empty when caught up...");
+ }
+ else {
+ cur = null;
+ caughtUp = true;
+ }
+ }
+
+ @Override
+ public void start(long offset) {
+ mkFileQ(); //figure out what to watch
+ advanceQ();
+ super.start(offset);
+ }
+
+ @Override
+ public synchronized boolean tailFile(ChunkReceiver eq)
+ throws InterruptedException {
+ boolean hasMoreData = false;
+ try {
+
+ if(caughtUp) {
+ //we're caught up and watching an unrotated file
+ mkFileQ(); //figure out what to watch
+ advanceQ();
+ }
+ if(cur == null) //file we're watching doesn't exist
+ return false;
+
+ log.debug("treating " + cur + " as " + toWatch);
+
+ long len = cur.length();
+ long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate;
+ if(len < fileReadOffset) {
+ log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len);
+ //no unseen changes to prev version, since mod date is older than last scan.
+ offsetOfFirstByte += fileReadOffset;
+ fileReadOffset = 0;
+ } else if(len > fileReadOffset) {
+ log.debug("slurping from " + cur+ " at offset " + fileReadOffset);
+ RandomAccessFile reader = new RandomAccessFile(cur, "r");
+ slurp(len, reader);
+ reader.close();
+ } else {
+ //we're either caught up or at EOF
+ if (!caughtUp) {
+ prevFileLastModDate = cur.lastModified();
+ //Hit EOF on an already-rotated file. Move on!
+ offsetOfFirstByte += fileReadOffset;
+ fileReadOffset = 0;
+ advanceQ();
+ log.debug("not caught up, and hit EOF. Moving forward in queue to " + cur);
+ } else
+ prevFileLastModDate = tsPreTail;
+
+ }
+
+ } catch(IOException e) {
+ log.warn("IOException in tailer", e);
+ deregisterAndStop(false);
+ }
+
+ return hasMoreData;
+ }
+}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Mon Dec 21 21:05:17 2009
@@ -263,7 +263,7 @@
public void tryToBind() throws IOException {
if(ALLOW_REMOTE)
s = new ServerSocket(portno);
- else {
+ else { //FIXME: is there a way to allow all local addresses? (including IPv6 local)
s = new ServerSocket();
s.bind(new InetSocketAddress(InetAddress.getByAddress(new byte[] {127,0,0,1}), portno));
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Mon Dec 21 21:05:17 2009
@@ -329,7 +329,7 @@
log.warn("Error creating adaptor of class " + adaptorClassName);
return null;
}
- String coreParams = adaptor.parseArgs(dataType,params);
+ String coreParams = adaptor.parseArgs(dataType,params,this);
if(coreParams == null) {
log.warn("invalid params for adaptor: " + params);
return null;
@@ -354,7 +354,7 @@
needNewCheckpoint = true;
try {
adaptor.start(adaptorID, dataType, offset, DataFactory
- .getInstance().getEventQueue(), this);
+ .getInstance().getEventQueue());
log.info("started a new adaptor, id = " + adaptorID + " function=["+adaptor.toString()+"]");
ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
ChukwaAgent.agentMetrics.addedAdaptor.inc();
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java Mon Dec 21 21:05:17 2009
@@ -71,8 +71,8 @@
File file = new File(logFile);
connector.start();
Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
- adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath());
- adaptor.start("", recordType, 0l,queue, AdaptorManager.NULL );
+ adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath(),AdaptorManager.NULL);
+ adaptor.start("", recordType, 0l,queue);
adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED);
connector.shutdown();
file.renameTo(new File(logFile + ".sav"));
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Mon Dec 21 21:05:17 2009
@@ -41,7 +41,7 @@
}
public void start(String adaptorID, String type, long offset,
- ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+ ChunkReceiver dest) throws AdaptorException {
this.setName("MaxRateSender adaptor");
this.adaptorID = adaptorID;
this.offset = offset;
@@ -51,7 +51,7 @@
}
@Override
- public String parseArgs(String d, String s) {
+ public String parseArgs(String d, String s,AdaptorManager c) {
return s;
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java Mon Dec 21 21:05:17 2009
@@ -109,6 +109,12 @@
return tmpOutput;
}
+
+ public static File makeTestFile(String name, int size) throws IOException {
+ return makeTestFile(name, size, new File(System.getProperty("test.build.data", "/tmp")));
+
+ }
+
public static File makeTestFile(File baseDir) throws IOException {
return makeTestFile("atemp",10, baseDir);
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java Mon Dec 21 21:05:17 2009
@@ -115,7 +115,7 @@
return false;
}
- private void createEmptyDir(File dir) {
+ public static void createEmptyDir(File dir) {
if(!nukeDirContents(dir))
dir.mkdir();
assertTrue(dir.isDirectory() && dir.listFiles().length == 0);
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java Mon Dec 21 21:05:17 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
public class TestLogRotate extends TestCase {
ChunkCatcherConnector chunks;
@@ -91,28 +92,5 @@
Thread.sleep(2000);
}
- private File makeTestFile(String name, int size) throws IOException {
- File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
- name);
- FileOutputStream fos = new FileOutputStream(tmpOutput);
-
- PrintWriter pw = new PrintWriter(fos);
- for (int i = 0; i < size; ++i) {
- pw.print(i + " ");
- pw.println("abcdefghijklmnopqrstuvwxyz");
- }
- pw.flush();
- pw.close();
- return tmpOutput;
- }
-
- public static void main(String[] args) {
- try {
- TestLogRotate tests = new TestLogRotate();
- tests.testLogRotate();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
}
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java?rev=892976&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java Mon Dec 21 21:05:17 2009
@@ -0,0 +1,93 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.apache.hadoop.conf.Configuration;
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
+import org.apache.log4j.Level;
+
+public class TestRCheckAdaptor extends TestCase {
+
+ ChunkCatcherConnector chunks;
+
+ public TestRCheckAdaptor() {
+ chunks = new ChunkCatcherConnector();
+ chunks.start();
+ }
+
+ public void testLogRotate() throws IOException, InterruptedException,
+ ChukwaAgent.AlreadyRunningException {
+ Configuration conf = new Configuration();
+ conf.set("chukwaAgent.control.port", "0");
+ conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
+
+// RCheckFTAdaptor.log.setLevel(Level.DEBUG);
+ File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
+ TestDirTailingAdaptor.createEmptyDir(baseDir);
+ File tmpOutput = new File(baseDir, "rotateTest.1");
+ PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput));
+ pw.println("First");
+ pw.close();
+ Thread.sleep(1000);//to make sure mod dates are distinguishing.
+ tmpOutput = new File(baseDir, "rotateTest");
+ pw = new PrintWriter(new FileOutputStream(tmpOutput));
+ pw.println("Second");
+ pw.close();
+
+
+ ChukwaAgent agent = new ChukwaAgent(conf);
+ String adaptorID = agent.processAddCommand("add lr = filetailer.RCheckFTAdaptor test " + tmpOutput.getAbsolutePath() + " 0");
+ assertNotNull(adaptorID);
+
+ Chunk c = chunks.waitForAChunk(2000);
+ assertNotNull(c);
+ assertTrue(c.getData().length == 6);
+ assertTrue("First\n".equals(new String(c.getData())));
+ c = chunks.waitForAChunk(2000);
+ assertNotNull(c);
+ assertTrue(c.getData().length == 7);
+ assertTrue("Second\n".equals(new String(c.getData())));
+
+ pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+ pw.println("Third");
+ pw.close();
+ c = chunks.waitForAChunk(2000);
+
+ assertNotNull(c);
+ assertTrue(c.getData().length == 6);
+ assertTrue("Third\n".equals(new String(c.getData())));
+ Thread.sleep(1500);
+
+ tmpOutput.renameTo(new File(baseDir, "rotateTest.2"));
+ pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+ pw.println("Fourth");
+ pw.close();
+ c = chunks.waitForAChunk(2000);
+
+ assertNotNull(c);
+ System.out.println("got " + new String(c.getData()));
+ assertTrue("Fourth\n".equals(new String(c.getData())));
+
+ Thread.sleep(1500);
+
+ tmpOutput.renameTo(new File(baseDir, "rotateTest.3"));
+ Thread.sleep(400);
+ pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+ pw.println("Fifth");
+ pw.close();
+ c = chunks.waitForAChunk(2000);
+ assertNotNull(c);
+ System.out.println("got " + new String(c.getData()));
+ assertTrue("Fifth\n".equals(new String(c.getData())));
+
+ agent.shutdown();
+ Thread.sleep(2000);
+ }
+
+}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java Mon Dec 21 21:05:17 2009
@@ -50,6 +50,12 @@
System.out.println("testing lightweight fta");
runTest("LWFTAdaptor");
}
+
+
+ public void testRotAdaptor() throws Exception {
+ System.out.println("testing lightweight fta");
+ runTest("LWFTAdaptor");
+ }
public void runTest(String name) throws IOException, InterruptedException,
ChukwaAgent.AlreadyRunningException {