You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/03 23:47:51 UTC

svn commit: r886969 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ src/java/org/apache/...

Author: asrabkin
Date: Thu Dec  3 22:47:48 2009
New Revision: 886969

URL: http://svn.apache.org/viewvc?rev=886969&view=rev
Log:
CHUKWA-395. Support for generalized buffering of adaptor output

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Dec  3 22:47:48 2009
@@ -4,6 +4,8 @@
 
   NEW FEATURES
 
+    CHUKWA-395. Support for generalized buffering of adaptor data. (asrabkin)
+
     CHUKWA-405  Add a "stop all" command. (asrabkin)
  
    IMPROVEMENTS

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java Thu Dec  3 22:47:48 2009
@@ -44,8 +44,15 @@
   }
   
   public abstract void start(long offset) throws AdaptorException;
+  public abstract String parseArgs(String s);
 
   public void deregisterAndStop(boolean gracefully) {
     control.stopAdaptor(adaptorID, gracefully);
   }
+  
+  public String parseArgs(String d, String s) {
+    return parseArgs(s);
+  }
+  
+  
 }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java?rev=886969&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java Thu Dec  3 22:47:48 2009
@@ -0,0 +1,81 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.util.*;
+import java.util.regex.*;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+
+public class AbstractWrapper implements NotifyOnCommitAdaptor,ChunkReceiver {
+ 
+  Adaptor inner;
+  String innerClassName;
+  String innerType;
+  ChunkReceiver dest;
+
+  @Override
+  public String getCurrentStatus() {
+    return innerClassName + " " + inner.getCurrentStatus();
+  }
+
+  @Override
+  public void hardStop() throws AdaptorException {
+    inner.hardStop();
+  }
+
+  static Pattern p = Pattern.compile("([^ ]+) +([^ ].*)");
+  
+  /**
+   * Note that the name of the inner class will get parsed out as a type
+   */
+  @Override
+  public String parseArgs(String innerClassName, String params) {
+    Matcher m = p.matcher(params);
+    this.innerClassName = innerClassName;
+    String innerCoreParams;
+    if(m.matches()) {
+      innerType = m.group(1);
+      inner = AdaptorFactory.createAdaptor(innerClassName);
+      innerCoreParams = inner.parseArgs(innerType,m.group(2));
+      return innerClassName + innerCoreParams;
+    }
+    else return null;
+  }
+
+  @Override
+  public long shutdown() throws AdaptorException {
+    return inner.shutdown();
+  }
+
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
+      throws AdaptorException {
+    return inner.shutdown(shutdownPolicy);
+  }
+
+  @Override
+  public String getType() {
+    return innerType;
+  }
+
+  /**
+   * Note that the name of the inner class will get parsed out as a type
+   */
+  @Override
+  public void start(String adaptorID, String type, long offset,
+      ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+    String dummyAdaptorID = adaptorID;
+    this.dest = dest;
+    inner.start(dummyAdaptorID, type, offset, this, c);
+  }
+
+  @Override
+  public void add(Chunk event) throws InterruptedException {
+    dest.add(event);
+  }
+
+  @Override
+  public void committed(long commitedByte) { }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Thu Dec  3 22:47:48 2009
@@ -77,8 +77,8 @@
    * 
    * @return Stream name as a string, null if params are malformed
    */
-  public String parseArgs(String params);
-
+  public String parseArgs(String datatype, String params);
+  
   /**
    * Signals this adaptor to come to an orderly stop. The adaptor ought to push
    * out all the data it can before exiting.

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java Thu Dec  3 22:47:48 2009
@@ -64,6 +64,7 @@
   
   public void run() {
     try {
+      log.debug("dir tailer starting to scan");
       while(continueScanning) {
         try {
           long sweepStartTime = System.currentTimeMillis();
@@ -90,7 +91,7 @@
       if(dir.lastModified() >= lastSweepStartTime) {
         control.processAddCommand(
             "add " + adaptorName +" " + type + " " + dir.getCanonicalPath() + " 0");
-      }
+      } 
     } else {
       for(File f: dir.listFiles()) {
         scanDirHierarchy(f);
@@ -113,10 +114,8 @@
     baseDir = new File(m.group(1));
     adaptorName = m.group(2);
     return baseDir + " " + adaptorName; //both params mandatory
-
   }
 
-
   @Deprecated
   public long shutdown() throws AdaptorException {
     return shutdown(AdaptorShutdownPolicy.GRACEFULLY);

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java?rev=886969&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java Thu Dec  3 22:47:48 2009
@@ -0,0 +1,95 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+import java.util.*;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+
+public class MemBuffered extends AbstractWrapper {
+  
+  static final String BUF_SIZE_OPT = "adaptor.memBufWrapper.size";
+  static final int DEFAULT_BUF_SIZE = 1024*1024; //1 MB
+  
+  static class MemBuf {
+    long dataSizeBytes;
+    final long maxDataSize;
+    final ArrayDeque<Chunk> chunks;
+    
+    public MemBuf(long maxDataSize) {
+      dataSizeBytes = 0;
+      this.maxDataSize = maxDataSize;
+      chunks = new ArrayDeque<Chunk>();
+    }
+    
+    synchronized void add(Chunk c) throws InterruptedException{
+      int len = c.getData().length;
+      while(len + dataSizeBytes > maxDataSize)
+        wait();
+      dataSizeBytes += len;
+      chunks.add(c);
+    }
+    
+    synchronized void removeUpTo(long l) {
+
+      long bytesFreed = 0;
+      while(!chunks.isEmpty()) {
+        Chunk c = chunks.getFirst();
+        if(c.getSeqID() > l)
+          chunks.addFirst(c);
+        else
+          bytesFreed += c.getData().length;
+      }
+      
+      if(bytesFreed > 0) {
+        dataSizeBytes -= bytesFreed;
+        notifyAll();
+      }
+    }
+    
+  }
+
+  static Map<String, MemBuf> buffers;
+  static {
+    buffers = new HashMap<String, MemBuf>();
+  }
+  
+  MemBuf myBuffer;
+  
+  @Override
+  public void add(Chunk event) throws InterruptedException {
+    myBuffer.add(event);
+    dest.add(event);
+  }
+  
+  @Override
+  public void start(String adaptorID, String type, long offset,
+      ChunkReceiver dest, AdaptorManager manager) throws AdaptorException {
+    try {
+      String dummyAdaptorID = adaptorID;
+      this.dest = dest;
+      
+      long bufSize = manager.getConfiguration().getInt(BUF_SIZE_OPT, DEFAULT_BUF_SIZE);
+      synchronized(buffers) {
+        myBuffer = buffers.get(adaptorID);
+        if(myBuffer == null) {
+          myBuffer = new MemBuf(bufSize);
+          buffers.put(adaptorID, myBuffer);
+        }
+      }
+
+      //Drain buffer into output queue
+      for(Chunk c:myBuffer.chunks)
+        dest.add(c);
+      
+      inner.start(dummyAdaptorID, innerType, offset, this, manager);
+    } catch(InterruptedException e) {
+     throw new AdaptorException(e);
+    }
+  }
+  
+  @Override
+  public void committed(long l) {
+    myBuffer.removeUpTo(l);
+  }
+
+}

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java?rev=886969&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/NotifyOnCommitAdaptor.java Thu Dec  3 22:47:48 2009
@@ -0,0 +1,6 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor;
+
+
+public interface NotifyOnCommitAdaptor extends Adaptor {
+    abstract void committed(long commitedByte);
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Thu Dec  3 22:47:48 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException;
+import org.apache.hadoop.chukwa.datacollection.adaptor.NotifyOnCommitAdaptor;
 import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
@@ -328,7 +329,7 @@
         log.warn("Error creating adaptor of class " + adaptorClassName);
         return null;
       }
-      String coreParams = adaptor.parseArgs(params);
+      String coreParams = adaptor.parseArgs(dataType,params);
       if(coreParams == null) {
         log.warn("invalid params for adaptor: " + params);
         return null;
@@ -476,6 +477,9 @@
           o.offset = uuid;
       }
       log.debug("got commit up to " + uuid + " on " + src + " = " + o.id);
+      if(src instanceof NotifyOnCommitAdaptor) {
+        ((NotifyOnCommitAdaptor) src).committed(uuid);
+      }
       return o.id;
     } else {
       log.warn("got commit up to " + uuid + "  for adaptor " + src

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java Thu Dec  3 22:47:48 2009
@@ -178,7 +178,7 @@
       resp.setStatus(200);
 
     } catch (Throwable e) {
-      log.warn("Exception talking to " + req.getRemoteHost() + " at "
+      log.warn("Exception talking to " + req.getRemoteHost() + " at t="
           + currentTime, e);
       throw new ServletException(e);
     }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java Thu Dec  3 22:47:48 2009
@@ -71,7 +71,7 @@
     File file = new File(logFile);
     connector.start();
     Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
-    adaptor.parseArgs( "0 " +file.getAbsolutePath());
+    adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath());
     adaptor.start("", recordType,  0l,queue, AdaptorManager.NULL );
     adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED);
     connector.shutdown();

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Thu Dec  3 22:47:48 2009
@@ -40,15 +40,13 @@
  * with the time-of-generation. The time of generation is stored, big-endian,
  * in the first eight bytes of each chunk.
  */
-public class ConstRateAdaptor extends Thread implements Adaptor {
+public class ConstRateAdaptor extends AbstractAdaptor implements Runnable {
 
   private int SLEEP_VARIANCE = 200;
   private int MIN_SLEEP = 300;
 
-  private String type;
   private long offset;
   private int bytesPerSec;
-  private ChunkReceiver dest;
 
   Random timeCoin;
   long seed;
@@ -59,14 +57,10 @@
     return type.trim() + " " + bytesPerSec + " " + seed;
   }
 
-  public void start(String adaptorID, String type, 
-      long offset, ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+  public void start(long offset) throws AdaptorException {
 
     this.offset = offset;
-    this.type = type;
-    this.dest = dest;
-    this.setName("ConstRate Adaptor_" + type);
-    Configuration conf = c.getConfiguration();
+    Configuration conf = control.getConfiguration();
     MIN_SLEEP = conf.getInt("constAdaptor.minSleep", MIN_SLEEP);
     SLEEP_VARIANCE = conf.getInt("constAdaptor.sleepVariance", SLEEP_VARIANCE);
     
@@ -75,7 +69,7 @@
     while(o < offset)
       o += (int) ((timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP) *
           (long) bytesPerSec / 1000L) + 8;
-    super.start(); // this is a Thread.start
+    new Thread(this).start(); // this is a Thread.start
   }
 
   public String parseArgs(String bytesPerSecParam) {
@@ -141,12 +135,6 @@
   }
 
   @Override
-  public String getType() {
-    return type;
-  }
-
-
-  @Override
   public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
     
     switch(shutdownPolicy) {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Thu Dec  3 22:47:48 2009
@@ -51,7 +51,7 @@
   }
 
   @Override
-  public String parseArgs(String s) {
+  public String parseArgs(String d, String s) {
     return s;
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java Thu Dec  3 22:47:48 2009
@@ -94,5 +94,28 @@
      out.close();
    }
    
+
+   public static File makeTestFile(String name, int size,File baseDir) throws IOException {
+     File tmpOutput = new File(baseDir, name);
+     FileOutputStream fos = new FileOutputStream(tmpOutput);
+
+     PrintWriter pw = new PrintWriter(fos);
+     for (int i = 0; i < size; ++i) {
+       pw.print(i + " ");
+       pw.println("abcdefghijklmnopqrstuvwxyz");
+     }
+     pw.flush();
+     pw.close();
+     return tmpOutput;
+   }
+   
+   public static File makeTestFile(File baseDir) throws IOException {
+     return makeTestFile("atemp",10, baseDir);
+   }
+   
+
+   public static File makeTestFile() throws IOException {
+     return makeTestFile("atemp",80, new File(System.getProperty("test.build.data", "/tmp")));
+   }
   
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/ChukwaTestAdaptor.java Thu Dec  3 22:47:48 2009
@@ -25,7 +25,6 @@
 
   private String params = null;
   private long startOffset = 0l;
-  private ChunkReceiver dest = null;
 
   @Override
   public String getCurrentStatus() {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java Thu Dec  3 22:47:48 2009
@@ -23,6 +23,8 @@
 import java.util.Map;
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.conf.*;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import junit.framework.TestCase;
 
 public class TestDirTailingAdaptor extends TestCase {
@@ -34,6 +36,8 @@
   public void testDirTailer() throws IOException,
   ChukwaAgent.AlreadyRunningException, InterruptedException {
     
+    DirTailingAdaptor.log.setLevel(Level.DEBUG);
+    
     Configuration conf = new Configuration();
     baseDir = new File(System.getProperty("test.build.data", "/tmp")).getCanonicalFile();
     File checkpointDir = new File(baseDir, "dirtailerTestCheckpoints");
@@ -67,7 +71,7 @@
     agent.shutdown();
 
     conf.setBoolean("chukwaAgent.checkpoint.enabled", true);
-
+    Thread.sleep(500); //wait a little bit to make sure new file ts is > last checkpoint time.
     File anOldFile = File.createTempFile("oldXYZ","file", dirWithFile);
     File aNewFile = File.createTempFile("new", "file", dirWithFile);
     anOldFile.deleteOnExit();

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestFileAdaptor.java Thu Dec  3 22:47:48 2009
@@ -29,11 +29,12 @@
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import junit.framework.TestCase;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
 
 public class TestFileAdaptor extends TestCase {
 
   Configuration conf = new Configuration();
-  File baseDir;
+  static File baseDir;
   File testFile;
   ChunkCatcherConnector chunks;
   
@@ -44,26 +45,12 @@
     conf.setBoolean("chukwaAgent.checkpoint.enabled", false);
     conf.setInt("chukwaAgent.adaptor.fileadaptor.timeoutperiod", 100);
     conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
-    testFile = makeTestFile();
+    testFile = makeTestFile("test", 10, baseDir);
 
     chunks = new ChunkCatcherConnector();
     chunks.start();
   }
   
-  public File makeTestFile() throws IOException {
-    File inDir = File.createTempFile("atemp", "file", baseDir);
-    inDir.deleteOnExit();
-    FileOutputStream fos = new FileOutputStream(inDir);
-
-    PrintWriter pw = new PrintWriter(fos);
-    for (int i = 0; i < 10; ++i) {
-      pw.print(i + " ");
-      pw.println("abcdefghijklmnopqrstuvwxyz");
-    }
-    pw.flush();
-    pw.close();
-    return inDir;
-  }
   
   public void testOnce()  throws IOException,
   ChukwaAgent.AlreadyRunningException, InterruptedException {

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestCharFileTailingAdaptorUTF8.java Thu Dec  3 22:47:48 2009
@@ -27,11 +27,14 @@
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
 import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
 
 public class TestCharFileTailingAdaptorUTF8 extends TestCase {
   ChunkCatcherConnector chunks;
-
+  File baseDir;
   public TestCharFileTailingAdaptorUTF8() {
+
+    baseDir = new File(System.getProperty("test.build.data", "/tmp"));
     chunks = new ChunkCatcherConnector();
     chunks.start();
   }
@@ -42,7 +45,7 @@
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
     ChukwaAgent agent = new ChukwaAgent(conf);
-    File testFile = makeTestFile("chukwaTest", 80);
+    File testFile = makeTestFile("chukwaTest", 80,baseDir);
     String adaptorId = agent
         .processAddCommand("add adaptor_test = org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8"
             + " lines " + testFile + " 0");
@@ -65,19 +68,6 @@
     Thread.sleep(2000);
   }
 
-  private File makeTestFile(String name, int size) throws IOException {
-    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
-        name);
-    FileOutputStream fos = new FileOutputStream(tmpOutput);
-
-    PrintWriter pw = new PrintWriter(fos);
-    for (int i = 0; i < size; ++i) {
-      pw.print(i + " ");
-      pw.println("abcdefghijklmnopqrstuvwxyz");
-    }
-    pw.flush();
-    pw.close();
-    return tmpOutput;
-  }
+  
 
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestFileTailingAdaptors.java Thu Dec  3 22:47:48 2009
@@ -19,6 +19,7 @@
 
 
 import java.io.*;
+
 import junit.framework.TestCase;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import java.util.Map;
@@ -29,6 +30,7 @@
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
 import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
 
 public class TestFileTailingAdaptors extends TestCase {
   ChunkCatcherConnector chunks;
@@ -44,7 +46,7 @@
     conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
     conf.set("chukwaAgent.control.port", "0");
 
-    testFile = makeTestFile("chukwaCrSepTest", 80);
+    testFile = makeTestFile("chukwaCrSepTest", 80,baseDir);
 
   }
 
@@ -106,24 +108,10 @@
     agent.shutdown();
   }
 
-  private File makeTestFile(String name, int size) throws IOException {
-    File tmpOutput = new File(baseDir, name);
-    tmpOutput.deleteOnExit();
-    FileOutputStream fos = new FileOutputStream(tmpOutput);
-
-    PrintWriter pw = new PrintWriter(fos);
-    for (int i = 0; i < size; ++i) {
-      pw.print(i + " ");
-      pw.println("abcdefghijklmnopqrstuvwxyz");
-    }
-    pw.flush();
-    pw.close();
-    return tmpOutput;
-  }
   
   public void testOffsetInAdaptorName() throws IOException, ChukwaAgent.AlreadyRunningException,
   InterruptedException{
-    File testFile = makeTestFile("foo", 120);
+    File testFile = makeTestFile("foo", 120,baseDir);
     ChukwaAgent agent = new ChukwaAgent(conf);
     assertEquals(0, agent.adaptorCount());
     agent.processAddCommand("add test = filetailer.FileTailingAdaptor raw " +testFile.getCanonicalPath() + " 0");

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java Thu Dec  3 22:47:48 2009
@@ -19,6 +19,7 @@
 
 
 import java.io.*;
+
 import junit.framework.TestCase;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
 import java.util.Map;
@@ -29,6 +30,7 @@
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
 import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
 
 public class TestRawAdaptor extends TestCase {
   ChunkCatcherConnector chunks;
@@ -58,7 +60,9 @@
     conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
     ChukwaAgent agent = new ChukwaAgent(conf);
 
-    File testFile = makeTestFile("chukwaRawTest", 80);
+    File testFile = makeTestFile("chukwaRawTest", 80, 
+        new File(System.getProperty("test.build.data", "/tmp")));
+    
     String adaptorId = agent
         .processAddCommand("add org.apache.hadoop.chukwa.datacollection.adaptor."
             +"filetailer." + name
@@ -78,27 +82,5 @@
     agent.shutdown();
   }
 
-  /**
-   * 
-   * @param name
-   * @param size size in lines
-   * @return
-   * @throws IOException
-   */
-  public static File makeTestFile(String name, int size) throws IOException {
-    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
-        name);
-    FileOutputStream fos = new FileOutputStream(tmpOutput);
-
-    PrintWriter pw = new PrintWriter(fos);
-    for (int i = 0; i < size; ++i) {
-      pw.print(i + " ");
-      pw.println("abcdefghijklmnopqrstuvwxyz");
-    }
-    pw.flush();
-    pw.close();
-    return tmpOutput;
-  }
-
 
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestStartAtOffset.java Thu Dec  3 22:47:48 2009
@@ -117,18 +117,7 @@
   }
 
   private File makeTestFile() throws IOException {
-    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
-        "chukwaTest");
-    FileOutputStream fos = new FileOutputStream(tmpOutput);
-
-    PrintWriter pw = new PrintWriter(fos);
-    for (int i = 0; i < 80; ++i) {
-      pw.print(i + " ");
-      pw.println("abcdefghijklmnopqrstuvwxyz");
-    }
-    pw.flush();
-    pw.close();
-    return tmpOutput;
+    return org.apache.hadoop.chukwa.util.TempFileUtil.makeTestFile();
   }
 
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java?rev=886969&r1=886968&r2=886969&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java Thu Dec  3 22:47:48 2009
@@ -49,6 +49,7 @@
 import org.mortbay.jetty.servlet.ServletHolder;
 import junit.framework.TestCase;
 import static org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender.DelayedCommit;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
 
 public class TestDelayedAcks extends TestCase {
   
@@ -60,6 +61,10 @@
   static int ROTATEPERIOD = 2000;
   
   int ACK_TIMEOUT = 200;
+  
+
+//start an adaptor -- chunks should appear in the connector
+    //wait for timeout.  More chunks should appear.
   public void testAdaptorTimeout() throws Exception {
     Configuration conf = new Configuration();
     conf.set("chukwaAgent.control.port", "0");
@@ -71,7 +76,7 @@
     ChunkCatcherConnector chunks = new ChunkCatcherConnector();
     chunks.start();
     assertEquals(0, agent.adaptorCount());
-    File testFile = TestRawAdaptor.makeTestFile("testDA", 50);
+    File testFile = makeTestFile("testDA", 50, new File(System.getProperty("test.build.data", "/tmp")));
     long len = testFile.length();
     System.out.println("wrote data to " + testFile);
     AdaptorResetThread restart = new AdaptorResetThread(conf, agent);
@@ -96,8 +101,6 @@
     assertEquals(len, c2.getData().length);
     assertTrue(resetCount > 0);
     agent.shutdown();
-//start an adaptor -- chunks should appear in the connector
-    //wait for timeout.  More chunks should appear.
     
     testFile.delete();
   }