You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/03 23:47:51 UTC
svn commit: r886969 - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/datacollection/adaptor/
src/java/org/apache/hadoop/chukwa/datacollection/agent/
src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/
src/java/org/apache/...
Author: asrabkin
Date: Thu Dec 3 22:47:48 2009
New Revision: 886969
URL: http://svn.apache.org/viewvc?rev=886969&view=rev
Log:
CHUKWA-395. Support for generalized buffering of adaptor output
Added:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Dec 3 22:47:48 2009
@@ -4,6 +4,8 @@
NEW FEATURES
+ CHUKWA-395. Support for generalized buffering of adaptor data. (asrabkin)
+
CHUKWA-405 Add a "stop all" command. (asrabkin)
IMPROVEMENTS
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java Thu Dec 3 22:47:48 2009
@@ -44,8 +44,15 @@
}
public abstract void start(long offset) throws AdaptorException;
+ public abstract String parseArgs(String s);
public void deregisterAndStop(boolean gracefully) {
control.stopAdaptor(adaptorID, gracefully);
}
+
+ public String parseArgs(String d, String s) {
+ return parseArgs(s);
+ }
+
+
}
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java?rev=886969&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java Thu Dec 3 22:47:48 2009
@@ -0,0 +1,81 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.util.*;
+import java.util.regex.*;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+
+public class AbstractWrapper implements NotifyOnCommitAdaptor,ChunkReceiver {
+
+ Adaptor inner;
+ String innerClassName;
+ String innerType;
+ ChunkReceiver dest;
+
+ @Override
+ public String getCurrentStatus() {
+ return innerClassName + " " + inner.getCurrentStatus();
+ }
+
+ @Override
+ public void hardStop() throws AdaptorException {
+ inner.hardStop();
+ }
+
+ static Pattern p = Pattern.compile("([^ ]+) +([^ ].*)");
+
+ /**
+ * Note that the name of the inner class will get parsed out as a type
+ */
+ @Override
+ public String parseArgs(String innerClassName, String params) {
+ Matcher m = p.matcher(params);
+ this.innerClassName = innerClassName;
+ String innerCoreParams;
+ if(m.matches()) {
+ innerType = m.group(1);
+ inner = AdaptorFactory.createAdaptor(innerClassName);
+ innerCoreParams = inner.parseArgs(innerType,m.group(2));
+ return innerClassName + innerCoreParams;
+ }
+ else return null;
+ }
+
+ @Override
+ public long shutdown() throws AdaptorException {
+ return inner.shutdown();
+ }
+
+ @Override
+ public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+ throws AdaptorException {
+ return inner.shutdown(shutdownPolicy);
+ }
+
+ @Override
+ public String getType() {
+ return innerType;
+ }
+
+ /**
+ * Note that the name of the inner class will get parsed out as a type
+ */
+ @Override
+ public void start(String adaptorID, String type, long offset,
+ ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+ String dummyAdaptorID = adaptorID;
+ this.dest = dest;
+ inner.start(dummyAdaptorID, type, offset, this, c);
+ }
+
+ @Override
+ public void add(Chunk event) throws InterruptedException {
+ dest.add(event);
+ }
+
+ @Override
+ public void committed(long commitedByte) { }
+
+}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Thu Dec 3 22:47:48 2009
@@ -77,8 +77,8 @@
*
* @return Stream name as a string, null if params are malformed
*/
- public String parseArgs(String params);
-
+ public String parseArgs(String datatype, String params);
+
/**
* Signals this adaptor to come to an orderly stop. The adaptor ought to push
* out all the data it can before exiting.
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java Thu Dec 3 22:47:48 2009
@@ -64,6 +64,7 @@
public void run() {
try {
+ log.debug("dir tailer starting to scan");
while(continueScanning) {
try {
long sweepStartTime = System.currentTimeMillis();
@@ -90,7 +91,7 @@
if(dir.lastModified() >= lastSweepStartTime) {
control.processAddCommand(
"add " + adaptorName +" " + type + " " + dir.getCanonicalPath() + " 0");
- }
+ }
} else {
for(File f: dir.listFiles()) {
scanDirHierarchy(f);
@@ -113,10 +114,8 @@
baseDir = new File(m.group(1));
adaptorName = m.group(2);
return baseDir + " " + adaptorName; //both params mandatory
-
}
-
@Deprecated
public long shutdown() throws AdaptorException {
return shutdown(AdaptorShutdownPolicy.GRACEFULLY);
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java?rev=886969&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java Thu Dec 3 22:47:48 2009
@@ -0,0 +1,95 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.util.*;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+
+public class MemBuffered extends AbstractWrapper {
+
+ static final String BUF_SIZE_OPT = "adaptor.memBufWrapper.size";
+ static final int DEFAULT_BUF_SIZE = 1024*1024; //1 MB
+
+ static class MemBuf {
+ long dataSizeBytes;
+ final long maxDataSize;
+ final ArrayDeque<Chunk> chunks;
+
+ public MemBuf(long maxDataSize) {
+ dataSizeBytes = 0;
+ this.maxDataSize = maxDataSize;
+ chunks = new ArrayDeque<Chunk>();
+ }
+
+ synchronized void add(Chunk c) throws InterruptedException{
+ int len = c.getData().length;
+ while(len + dataSizeBytes > maxDataSize)
+ wait();
+ dataSizeBytes += len;
+ chunks.add(c);
+ }
+
+ synchronized void removeUpTo(long l) {
+
+ long bytesFreed = 0;
+ while(!chunks.isEmpty()) {
+ Chunk c = chunks.getFirst();
+ if(c.getSeqID() > l)
+ chunks.addFirst(c);
+ else
+ bytesFreed += c.getData().length;
+ }
+
+ if(bytesFreed > 0) {
+ dataSizeBytes -= bytesFreed;
+ notifyAll();
+ }
+ }
+
+ }
+
+ static Map<String, MemBuf> buffers;
+ static {
+ buffers = new HashMap<String, MemBuf>();
+ }
+
+ MemBuf myBuffer;
+
+ @Override
+ public void add(Chunk event) throws InterruptedException {
+ myBuffer.add(event);
+ dest.add(event);
+ }
+
+ @Override
+ public void start(String adaptorID, String type, long offset,
+ ChunkReceiver dest, AdaptorManager manager) throws AdaptorException {
+ try {
+ String dummyAdaptorID = adaptorID;
+ this.dest = dest;
+
+ long bufSize = manager.getConfiguration().getInt(BUF_SIZE_OPT, DEFAULT_BUF_SIZE);
+ synchronized(buffers) {
+ myBuffer = buffers.get(adaptorID);
+ if(myBuffer == null) {
+ myBuffer = new MemBuf(bufSize);
+ buffers.put(adaptorID, myBuffer);
+ }
+ }
+
+ //Drain buffer into output queue
+ for(Chunk c:myBuffer.chunks)
+ dest.add(c);
+
+ inner.start(dummyAdaptorID, innerType, offset, this, manager);
+ } catch(InterruptedException e) {
+ throw new AdaptorException(e);
+ }
+ }
+
+ @Override
+ public void committed(long l) {
+ myBuffer.removeUpTo(l);
+ }
+
+}
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java?rev=886969&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java Thu Dec 3 22:47:48 2009
@@ -0,0 +1,6 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+
+public interface NotifyOnCommitAdaptor extends Adaptor {
+ abstract void committed(long commitedByte);
+}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Thu Dec 3 22:47:48 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.NotifyOnCommitAdaptor;
import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
import org.apache.hadoop.chukwa.datacollection.connector.Connector;
import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
@@ -328,7 +329,7 @@
log.warn("Error creating adaptor of class " + adaptorClassName);
return null;
}
- String coreParams = adaptor.parseArgs(params);
+ String coreParams = adaptor.parseArgs(dataType,params);
if(coreParams == null) {
log.warn("invalid params for adaptor: " + params);
return null;
@@ -476,6 +477,9 @@
o.offset = uuid;
}
log.debug("got commit up to " + uuid + " on " + src + " = " + o.id);
+ if(src instanceof NotifyOnCommitAdaptor) {
+ ((NotifyOnCommitAdaptor) src).committed(uuid);
+ }
return o.id;
} else {
log.warn("got commit up to " + uuid + " for adaptor " + src
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Thu Dec 3 22:47:48 2009
@@ -178,7 +178,7 @@
resp.setStatus(200);
} catch (Throwable e) {
- log.warn("Exception talking to " + req.getRemoteHost() + " at "
+ log.warn("Exception talking to " + req.getRemoteHost() + " at t="
+ currentTime, e);
throw new ServletException(e);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java Thu Dec 3 22:47:48 2009
@@ -71,7 +71,7 @@
File file = new File(logFile);
connector.start();
Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
- adaptor.parseArgs( "0 " +file.getAbsolutePath());
+ adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath());
adaptor.start("", recordType, 0l,queue, AdaptorManager.NULL );
adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED);
connector.shutdown();
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Thu Dec 3 22:47:48 2009
@@ -40,15 +40,13 @@
* with the time-of-generation. The time of generation is stored, big-endian,
* in the first eight bytes of each chunk.
*/
-public class ConstRateAdaptor extends Thread implements Adaptor {
+public class ConstRateAdaptor extends AbstractAdaptor implements Runnable {
private int SLEEP_VARIANCE = 200;
private int MIN_SLEEP = 300;
- private String type;
private long offset;
private int bytesPerSec;
- private ChunkReceiver dest;
Random timeCoin;
long seed;
@@ -59,14 +57,10 @@
return type.trim() + " " + bytesPerSec + " " + seed;
}
- public void start(String adaptorID, String type,
- long offset, ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+ public void start(long offset) throws AdaptorException {
this.offset = offset;
- this.type = type;
- this.dest = dest;
- this.setName("ConstRate Adaptor_" + type);
- Configuration conf = c.getConfiguration();
+ Configuration conf = control.getConfiguration();
MIN_SLEEP = conf.getInt("constAdaptor.minSleep", MIN_SLEEP);
SLEEP_VARIANCE = conf.getInt("constAdaptor.sleepVariance", SLEEP_VARIANCE);
@@ -75,7 +69,7 @@
while(o < offset)
o += (int) ((timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP) *
(long) bytesPerSec / 1000L) + 8;
- super.start(); // this is a Thread.start
+ new Thread(this).start(); // this is a Thread.start
}
public String parseArgs(String bytesPerSecParam) {
@@ -141,12 +135,6 @@
}
@Override
- public String getType() {
- return type;
- }
-
-
- @Override
public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
switch(shutdownPolicy) {
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Thu Dec 3 22:47:48 2009
@@ -51,7 +51,7 @@
}
@Override
- public String parseArgs(String s) {
+ public String parseArgs(String d, String s) {
return s;
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java Thu Dec 3 22:47:48 2009
@@ -94,5 +94,28 @@
out.close();
}
+
+ public static File makeTestFile(String name, int size,File baseDir) throws IOException {
+ File tmpOutput = new File(baseDir, name);
+ FileOutputStream fos = new FileOutputStream(tmpOutput);
+
+ PrintWriter pw = new PrintWriter(fos);
+ for (int i = 0; i < size; ++i) {
+ pw.print(i + " ");
+ pw.println("abcdefghijklmnopqrstuvwxyz");
+ }
+ pw.flush();
+ pw.close();
+ return tmpOutput;
+ }
+
+ public static File makeTestFile(File baseDir) throws IOException {
+ return makeTestFile("atemp",10, baseDir);
+ }
+
+
+ public static File makeTestFile() throws IOException {
+ return makeTestFile("atemp",80, new File(System.getProperty("test.build.data", "/tmp")));
+ }
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java Thu Dec 3 22:47:48 2009
@@ -25,7 +25,6 @@
private String params = null;
private long startOffset = 0l;
- private ChunkReceiver dest = null;
@Override
public String getCurrentStatus() {
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java Thu Dec 3 22:47:48 2009
@@ -23,6 +23,8 @@
import java.util.Map;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.conf.*;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import junit.framework.TestCase;
public class TestDirTailingAdaptor extends TestCase {
@@ -34,6 +36,8 @@
public void testDirTailer() throws IOException,
ChukwaAgent.AlreadyRunningException, InterruptedException {
+ DirTailingAdaptor.log.setLevel(Level.DEBUG);
+
Configuration conf = new Configuration();
baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
File checkpointDir = new File(baseDir, "dirtailerTestCheckpoints");
@@ -67,7 +71,7 @@
agent.shutdown();
conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
-
+ Thread.sleep(500); //wait a little bit to make sure new file ts is > last checkpoint time.
File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
File aNewFile = File.createTempFile("new", "file", dirWithFile);
anOldFile.deleteOnExit();
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java Thu Dec 3 22:47:48 2009
@@ -29,11 +29,12 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import junit.framework.TestCase;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
public class TestFileAdaptor extends TestCase {
Configuration conf = new Configuration();
- File baseDir;
+ static File baseDir;
File testFile;
ChunkCatcherConnector chunks;
@@ -44,26 +45,12 @@
conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
conf.setInt("chukwaAgent.adaptor.fileadaptor.timeoutperiod", 100);
conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
- testFile = makeTestFile();
+ testFile = makeTestFile("test", 10, baseDir);
chunks = new ChunkCatcherConnector();
chunks.start();
}
- public File makeTestFile() throws IOException {
- File inDir = File.createTempFile("atemp", "file", baseDir);
- inDir.deleteOnExit();
- FileOutputStream fos = new FileOutputStream(inDir);
-
- PrintWriter pw = new PrintWriter(fos);
- for (int i = 0; i < 10; ++i) {
- pw.print(i + " ");
- pw.println("abcdefghijklmnopqrstuvwxyz");
- }
- pw.flush();
- pw.close();
- return inDir;
- }
public void testOnce() throws IOException,
ChukwaAgent.AlreadyRunningException, InterruptedException {
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java Thu Dec 3 22:47:48 2009
@@ -27,11 +27,14 @@
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
public class TestCharFileTailingAdaptorUTF8 extends TestCase {
ChunkCatcherConnector chunks;
-
+ File baseDir;
public TestCharFileTailingAdaptorUTF8() {
+
+ baseDir = new File(System.getProperty("test.build.data", "/tmp"));
chunks = new ChunkCatcherConnector();
chunks.start();
}
@@ -42,7 +45,7 @@
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
ChukwaAgent agent = new ChukwaAgent(conf);
- File testFile = makeTestFile("chukwaTest", 80);
+ File testFile = makeTestFile("chukwaTest", 80,baseDir);
String adaptorId = agent
.processAddCommand("add adaptor_test = org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8"
+ " lines " + testFile + " 0");
@@ -65,19 +68,6 @@
Thread.sleep(2000);
}
- private File makeTestFile(String name, int size) throws IOException {
- File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
- name);
- FileOutputStream fos = new FileOutputStream(tmpOutput);
-
- PrintWriter pw = new PrintWriter(fos);
- for (int i = 0; i < size; ++i) {
- pw.print(i + " ");
- pw.println("abcdefghijklmnopqrstuvwxyz");
- }
- pw.flush();
- pw.close();
- return tmpOutput;
- }
+
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java Thu Dec 3 22:47:48 2009
@@ -19,6 +19,7 @@
import java.io.*;
+
import junit.framework.TestCase;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import java.util.Map;
@@ -29,6 +30,7 @@
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
public class TestFileTailingAdaptors extends TestCase {
ChunkCatcherConnector chunks;
@@ -44,7 +46,7 @@
conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
conf.set("chukwaAgent.control.port", "0");
- testFile = makeTestFile("chukwaCrSepTest", 80);
+ testFile = makeTestFile("chukwaCrSepTest", 80,baseDir);
}
@@ -106,24 +108,10 @@
agent.shutdown();
}
- private File makeTestFile(String name, int size) throws IOException {
- File tmpOutput = new File(baseDir, name);
- tmpOutput.deleteOnExit();
- FileOutputStream fos = new FileOutputStream(tmpOutput);
-
- PrintWriter pw = new PrintWriter(fos);
- for (int i = 0; i < size; ++i) {
- pw.print(i + " ");
- pw.println("abcdefghijklmnopqrstuvwxyz");
- }
- pw.flush();
- pw.close();
- return tmpOutput;
- }
public void testOffsetInAdaptorName() throws IOException, ChukwaAgent.AlreadyRunningException,
InterruptedException{
- File testFile = makeTestFile("foo", 120);
+ File testFile = makeTestFile("foo", 120,baseDir);
ChukwaAgent agent = new ChukwaAgent(conf);
assertEquals(0, agent.adaptorCount());
agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java Thu Dec 3 22:47:48 2009
@@ -19,6 +19,7 @@
import java.io.*;
+
import junit.framework.TestCase;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import java.util.Map;
@@ -29,6 +30,7 @@
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
public class TestRawAdaptor extends TestCase {
ChunkCatcherConnector chunks;
@@ -58,7 +60,9 @@
conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
ChukwaAgent agent = new ChukwaAgent(conf);
- File testFile = makeTestFile("chukwaRawTest", 80);
+ File testFile = makeTestFile("chukwaRawTest", 80,
+ new File(System.getProperty("test.build.data", "/tmp")));
+
String adaptorId = agent
.processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor."
+"filetailer." + name
@@ -78,27 +82,5 @@
agent.shutdown();
}
- /**
- *
- * @param name
- * @param size size in lines
- * @return
- * @throws IOException
- */
- public static File makeTestFile(String name, int size) throws IOException {
- File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
- name);
- FileOutputStream fos = new FileOutputStream(tmpOutput);
-
- PrintWriter pw = new PrintWriter(fos);
- for (int i = 0; i < size; ++i) {
- pw.print(i + " ");
- pw.println("abcdefghijklmnopqrstuvwxyz");
- }
- pw.flush();
- pw.close();
- return tmpOutput;
- }
-
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java Thu Dec 3 22:47:48 2009
@@ -117,18 +117,7 @@
}
private File makeTestFile() throws IOException {
- File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
- "chukwaTest");
- FileOutputStream fos = new FileOutputStream(tmpOutput);
-
- PrintWriter pw = new PrintWriter(fos);
- for (int i = 0; i < 80; ++i) {
- pw.print(i + " ");
- pw.println("abcdefghijklmnopqrstuvwxyz");
- }
- pw.flush();
- pw.close();
- return tmpOutput;
+ return org.apache.hadoop.chukwa.util.TempFileUtil.makeTestFile();
}
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java Thu Dec 3 22:47:48 2009
@@ -49,6 +49,7 @@
import org.mortbay.jetty.servlet.ServletHolder;
import junit.framework.TestCase;
import static org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender.DelayedCommit;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
public class TestDelayedAcks extends TestCase {
@@ -60,6 +61,10 @@
static int ROTATEPERIOD = 2000;
int ACK_TIMEOUT = 200;
+
+
+//start an adaptor -- chunks should appear in the connector
+ //wait for timeout. More chunks should appear.
public void testAdaptorTimeout() throws Exception {
Configuration conf = new Configuration();
conf.set("chukwaAgent.control.port", "0");
@@ -71,7 +76,7 @@
ChunkCatcherConnector chunks = new ChunkCatcherConnector();
chunks.start();
assertEquals(0, agent.adaptorCount());
- File testFile = TestRawAdaptor.makeTestFile("testDA", 50);
+ File testFile = makeTestFile("testDA", 50, new File(System.getProperty("test.build.data", "/tmp")));
long len = testFile.length();
System.out.println("wrote data to " + testFile);
AdaptorResetThread restart = new AdaptorResetThread(conf, agent);
@@ -96,8 +101,6 @@
assertEquals(len, c2.getData().length);
assertTrue(resetCount > 0);
agent.shutdown();
-//start an adaptor -- chunks should appear in the connector
- //wait for timeout. More chunks should appear.
testFile.delete();
}