You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/12/21 22:05:18 UTC

svn commit: r892976 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache...

Author: asrabkin
Date: Mon Dec 21 21:05:17 2009
New Revision: 892976

URL: http://svn.apache.org/viewvc?rev=892976&view=rev
Log:
CHUKWA-421. Use modification time to detect rotation.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Mon Dec 21 21:05:17 2009
@@ -18,6 +18,8 @@
  
    IMPROVEMENTS
 
+    CHUKWA-421. Use modification time to detect rotation. (asrabkin)
+
     CHUKWA-432. PipelineableWriter becomes an abstract class. (asrabkin)
 
     CHUKWA-429. Update HDFS heatmap color with rainbow colors. (Eric Yang)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractAdaptor.java Mon Dec 21 21:05:17 2009
@@ -35,11 +35,10 @@
 
   @Override
   public final void start(String adaptorID, String type, long offset,
-      ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+      ChunkReceiver dest) throws AdaptorException {
     this.adaptorID = adaptorID;
     this.type = type;
     this.dest=dest;
-    control = c;
     start(offset);
   }
   
@@ -50,7 +49,8 @@
     control.stopAdaptor(adaptorID, gracefully);
   }
   
-  public String parseArgs(String d, String s) {
+  public String parseArgs(String d, String s, AdaptorManager c) {
+    control = c;
     return parseArgs(s);
   }
   

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/AbstractWrapper.java Mon Dec 21 21:05:17 2009
@@ -13,6 +13,7 @@
   String innerClassName;
   String innerType;
   ChunkReceiver dest;
+  AdaptorManager manager;
 
   @Override
   public String getCurrentStatus() {
@@ -30,14 +31,15 @@
    * Note that the name of the inner class will get parsed out as a type
    */
   @Override
-  public String parseArgs(String innerClassName, String params) {
+  public String parseArgs(String innerClassName, String params, AdaptorManager a) {
+    manager = a;
     Matcher m = p.matcher(params);
     this.innerClassName = innerClassName;
     String innerCoreParams;
     if(m.matches()) {
       innerType = m.group(1);
       inner = AdaptorFactory.createAdaptor(innerClassName);
-      innerCoreParams = inner.parseArgs(innerType,m.group(2));
+      innerCoreParams = inner.parseArgs(innerType,m.group(2),a);
       return innerClassName + innerCoreParams;
     }
     else return null;
@@ -64,10 +66,10 @@
    */
   @Override
   public void start(String adaptorID, String type, long offset,
-      ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+      ChunkReceiver dest) throws AdaptorException {
     String dummyAdaptorID = adaptorID;
     this.dest = dest;
-    inner.start(dummyAdaptorID, type, offset, this, c);
+    inner.start(dummyAdaptorID, type, offset, this);
   }
 
   @Override

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Mon Dec 21 21:05:17 2009
@@ -55,7 +55,7 @@
    * @throws AdaptorException
    */
   public void start(String adaptorID, String type, long offset,
-      ChunkReceiver dest, AdaptorManager c) throws AdaptorException;
+      ChunkReceiver dest) throws AdaptorException;
 
   /**
    * Return the adaptor's state Should not include class name or byte
@@ -77,7 +77,7 @@
    * 
    * @return Stream name as a string, null if params are malformed
    */
-  public String parseArgs(String datatype, String params);
+  public String parseArgs(String datatype, String params, AdaptorManager c);
   
   /**
    * Signals this adaptor to come to an orderly stop. The adaptor ought to push

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/MemBuffered.java Mon Dec 21 21:05:17 2009
@@ -63,7 +63,7 @@
   
   @Override
   public void start(String adaptorID, String type, long offset,
-      ChunkReceiver dest, AdaptorManager manager) throws AdaptorException {
+      ChunkReceiver dest) throws AdaptorException {
     try {
       String dummyAdaptorID = adaptorID;
       this.dest = dest;
@@ -81,7 +81,7 @@
       for(Chunk c:myBuffer.chunks)
         dest.add(c);
       
-      inner.start(dummyAdaptorID, innerType, offset, this, manager);
+      inner.start(dummyAdaptorID, innerType, offset, this);
     } catch(InterruptedException e) {
      throw new AdaptorException(e);
     }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailer.java Mon Dec 21 21:05:17 2009
@@ -48,21 +48,14 @@
    */
   int DEFAULT_SAMPLE_PERIOD_MS = 1000 * 2;
   int SAMPLE_PERIOD_MS = DEFAULT_SAMPLE_PERIOD_MS;
-  private static Configuration conf = null;
+//  private Configuration conf = null;
   public static final int MAX_SAMPLE_PERIOD = 60 * 1000;
 
-  FileTailer() {
-    if (conf == null) {
-      ChukwaAgent agent = ChukwaAgent.getAgent();
-      if (agent != null) {
-        conf = agent.getConfiguration();
-        if (conf != null) {
-          SAMPLE_PERIOD_MS = conf.getInt(
-              "chukwaAgent.adaptor.context.switch.time",
-              DEFAULT_SAMPLE_PERIOD_MS);
-        }
-      }
-    }
+  FileTailer(Configuration conf) {
+ //   this.conf = conf;
+    SAMPLE_PERIOD_MS = conf.getInt(
+        "chukwaAgent.adaptor.context.switch.time",
+        DEFAULT_SAMPLE_PERIOD_MS);
     eq = DataFactory.getInstance().getEventQueue();
 
     // iterations are much more common than adding a new adaptor

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java Mon Dec 21 21:05:17 2009
@@ -57,7 +57,7 @@
   protected static FileTailer tailer;
   
   static {
-    tailer = new FileTailer();
+    tailer = null;
     log = Logger.getLogger(FileTailingAdaptor.class);
   }
   
@@ -77,9 +77,11 @@
 
   @Override
   public void start(long offset) {
-    conf = control.getConfiguration();
-    MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
-    this.fileReadOffset = offset;    
+    synchronized(LWFTAdaptor.class) {
+      if(tailer == null)
+        tailer = new FileTailer(control.getConfiguration());
+    }
+    this.fileReadOffset = offset - offsetOfFirstByte;    
     tailer.startWatchingFile(this);
   }
   
@@ -97,12 +99,15 @@
   public String getStreamName() {
     return toWatch.getPath();
   }
-
+  
   @Override
   public String parseArgs(String params) { 
+    conf = control.getConfiguration();
+    MAX_READ_SIZE = conf.getInt(MAX_READ_SIZE_OPT, DEFAULT_MAX_READ_SIZE);
+
     Pattern cmd = Pattern.compile("(\\d+)\\s+(.+)\\s?");
     Matcher m = cmd.matcher(params);
-    if (m.matches()) {
+    if (m.matches()) { //check for first-byte offset. If absent, assume we just got a path.
       offsetOfFirstByte = Long.parseLong(m.group(1));
       toWatch = new File(m.group(2));
     } else {
@@ -228,6 +233,7 @@
   }
 
   private void handleShrunkenFile(long measuredLen) {
+    log.info("file "+ toWatch +"shrank from " + fileReadOffset + " to " + measuredLen);
     offsetOfFirstByte = measuredLen;
     fileReadOffset = 0;
   }

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java?rev=892976&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java Mon Dec 21 21:05:17 2009
@@ -0,0 +1,164 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.LinkedList;
+import org.apache.hadoop.chukwa.datacollection.ChunkReceiver;
+
+/**
+ * Checkpoint state:
+ *   date modified of most-recently tailed file, offset of first byte of that file,
+ *   then regular FTA arts 
+ *
+ */
+public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter {
+  
+  private static class FPair implements Comparable<FPair> {
+    File f;
+    long mod;
+    FPair(File f) {
+      this.f = f;
+      mod = f.lastModified();
+    }
+    /**
+     * -1   implies this is LESS THAN o
+     */
+    @Override
+    public int compareTo(FPair o) {
+      if(mod < o.mod)
+        return -1;
+      else if (mod > o.mod)
+        return 1;
+      //want toWatch to be last
+      else return (o.f.getName().compareTo(f.getName()));//shouldn't happen?
+    }
+  }
+  
+  long prevFileLastModDate = 0;
+  LinkedList<FPair> fileQ = new LinkedList<FPair>(); 
+  String fBaseName;
+  File cur; //this is the actual physical file being watched.  
+            // in contrast, toWatch is the path specified by the user
+  boolean caughtUp = false;
+  /**
+   * Check for date-modified and offset; if absent assume we just got a name.
+   */
+  @Override
+  public String parseArgs(String params) { 
+    Pattern cmd = Pattern.compile("d:(\\d+)\\s+(\\d+)\\s+(.+)\\s?");
+    Matcher m = cmd.matcher(params);
+    if (m.matches()) {
+      prevFileLastModDate = Long.parseLong(m.group(1));
+      offsetOfFirstByte = Long.parseLong(m.group(2));
+      toWatch = new File(m.group(3)).getAbsoluteFile();
+    } else {
+      toWatch = new File(params.trim()).getAbsoluteFile();
+    }
+    fBaseName = toWatch.getName();
+    return toWatch.getAbsolutePath();
+  }
+  
+  public String getCurrentStatus() {
+    return type.trim() + " d:" + prevFileLastModDate + " "  + offsetOfFirstByte + " " + toWatch.getPath();
+  }
+
+  @Override
+  public boolean accept(File pathname) {
+    return pathname.getName().startsWith(fBaseName) &&
+     ( pathname.getName().equals(fBaseName) ||
+       pathname.lastModified() > prevFileLastModDate);
+  }
+  
+  
+  protected void mkFileQ() {
+    
+    File toWatchDir = toWatch.getParentFile();
+    File[] candidates = toWatchDir.listFiles(this);
+    if(candidates == null) {
+      log.error(toWatchDir + " is not a directory");
+    } else {
+      log.debug("saw " + candidates.length + " files matching pattern");
+      fileQ = new LinkedList<FPair>();
+      for(File f:candidates)
+        fileQ.add(new FPair(f));
+      Collections.sort(fileQ);
+    } 
+  }
+  
+  protected void advanceQ() {
+    FPair next = fileQ.poll();
+    if(next != null) {
+      cur = next.f;
+      caughtUp = toWatch.equals(cur);
+      if(caughtUp && !fileQ.isEmpty()) 
+        log.warn("expected rotation queue to be empty when caught up...");
+    }
+    else {
+      cur = null;
+      caughtUp = true;
+    }
+  }
+  
+  @Override
+  public void start(long offset) {
+    mkFileQ(); //figure out what to watch
+    advanceQ();
+    super.start(offset);
+  }
+  
+  @Override
+  public synchronized boolean tailFile(ChunkReceiver eq)
+  throws InterruptedException {
+    boolean hasMoreData = false;
+    try {
+      
+      if(caughtUp) {
+        //we're caught up and watching an unrotated file
+        mkFileQ(); //figure out what to watch
+        advanceQ();
+      }
+      if(cur == null) //file we're watching doesn't exist
+        return false;
+      
+      log.debug("treating " + cur + " as " + toWatch);
+      
+      long len = cur.length();
+      long tsPreTail = cur.exists() ? cur.lastModified() : prevFileLastModDate;
+      if(len < fileReadOffset) {
+        log.info("file "+ cur +" shrank from " + fileReadOffset + " to " + len);
+        //no unseen changes to prev version, since mod date is older than last scan.
+        offsetOfFirstByte += fileReadOffset;
+        fileReadOffset = 0;
+      } else if(len > fileReadOffset) {
+        log.debug("slurping from " + cur+ " at offset " + fileReadOffset);
+        RandomAccessFile reader = new RandomAccessFile(cur, "r");
+        slurp(len, reader);
+        reader.close();
+      } else {
+        //we're either caught up or at EOF
+        if (!caughtUp) {
+          prevFileLastModDate = cur.lastModified();
+          //Hit EOF on an already-rotated file. Move on!
+          offsetOfFirstByte += fileReadOffset;
+          fileReadOffset = 0;
+          advanceQ();
+          log.debug("not caught up, and hit EOF.  Moving forward in queue to " + cur);
+        } else 
+          prevFileLastModDate = tsPreTail;
+
+      }
+        
+    } catch(IOException e) {
+      log.warn("IOException in tailer", e);
+      deregisterAndStop(false);
+    }
+    
+    return hasMoreData;
+  }
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java Mon Dec 21 21:05:17 2009
@@ -263,7 +263,7 @@
   public void tryToBind() throws IOException {
     if(ALLOW_REMOTE)
       s = new ServerSocket(portno);
-    else {
+    else {  //FIXME: is there a way to allow all local addresses? (including IPv6 local)
       s = new ServerSocket();
       s.bind(new InetSocketAddress(InetAddress.getByAddress(new byte[] {127,0,0,1}), portno));
     }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Mon Dec 21 21:05:17 2009
@@ -329,7 +329,7 @@
         log.warn("Error creating adaptor of class " + adaptorClassName);
         return null;
       }
-      String coreParams = adaptor.parseArgs(dataType,params);
+      String coreParams = adaptor.parseArgs(dataType,params,this);
       if(coreParams == null) {
         log.warn("invalid params for adaptor: " + params);
         return null;
@@ -354,7 +354,7 @@
         needNewCheckpoint = true;
         try {
           adaptor.start(adaptorID, dataType, offset, DataFactory
-              .getInstance().getEventQueue(), this);
+              .getInstance().getEventQueue());
           log.info("started a new adaptor, id = " + adaptorID + " function=["+adaptor.toString()+"]");
           ChukwaAgent.agentMetrics.adaptorCount.set(adaptorsByName.size());
           ChukwaAgent.agentMetrics.addedAdaptor.inc();

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/tools/backfilling/BackfillingLoader.java Mon Dec 21 21:05:17 2009
@@ -71,8 +71,8 @@
     File file = new File(logFile);
     connector.start();
     Adaptor adaptor = AdaptorFactory.createAdaptor(adaptorName);
-    adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath());
-    adaptor.start("", recordType,  0l,queue, AdaptorManager.NULL );
+    adaptor.parseArgs(recordType, "0 " +file.getAbsolutePath(),AdaptorManager.NULL);
+    adaptor.start("", recordType,  0l,queue);
     adaptor.shutdown(AdaptorShutdownPolicy.WAIT_TILL_FINISHED);
     connector.shutdown();
     file.renameTo(new File(logFile + ".sav"));

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/MaxRateSender.java Mon Dec 21 21:05:17 2009
@@ -41,7 +41,7 @@
   }
 
   public void start(String adaptorID, String type, long offset,
-      ChunkReceiver dest, AdaptorManager c) throws AdaptorException {
+      ChunkReceiver dest) throws AdaptorException {
     this.setName("MaxRateSender adaptor");
     this.adaptorID = adaptorID;
     this.offset = offset;
@@ -51,7 +51,7 @@
   }
 
   @Override
-  public String parseArgs(String d, String s) {
+  public String parseArgs(String d, String s,AdaptorManager c) {
     return s;
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/TempFileUtil.java Mon Dec 21 21:05:17 2009
@@ -109,6 +109,12 @@
      return tmpOutput;
    }
    
+
+   public static File makeTestFile(String name, int size) throws IOException {
+     return makeTestFile(name, size, new File(System.getProperty("test.build.data", "/tmp")));
+
+   }
+   
    public static File makeTestFile(File baseDir) throws IOException {
      return makeTestFile("atemp",10, baseDir);
    }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/TestDirTailingAdaptor.java Mon Dec 21 21:05:17 2009
@@ -115,7 +115,7 @@
     return false;
   }
   
-  private void createEmptyDir(File dir) {
+  public static void createEmptyDir(File dir) {
     if(!nukeDirContents(dir))
       dir.mkdir();
     assertTrue(dir.isDirectory() && dir.listFiles().length == 0);

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestLogRotate.java Mon Dec 21 21:05:17 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import static org.apache.hadoop.chukwa.util.TempFileUtil.*;
 
 public class TestLogRotate extends TestCase {
   ChunkCatcherConnector chunks;
@@ -91,28 +92,5 @@
     Thread.sleep(2000);
   }
 
-  private File makeTestFile(String name, int size) throws IOException {
-    File tmpOutput = new File(System.getProperty("test.build.data", "/tmp"),
-        name);
-    FileOutputStream fos = new FileOutputStream(tmpOutput);
-
-    PrintWriter pw = new PrintWriter(fos);
-    for (int i = 0; i < size; ++i) {
-      pw.print(i + " ");
-      pw.println("abcdefghijklmnopqrstuvwxyz");
-    }
-    pw.flush();
-    pw.close();
-    return tmpOutput;
-  }
-
-  public static void main(String[] args) {
-    try {
-      TestLogRotate tests = new TestLogRotate();
-      tests.testLogRotate();
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
 
 }

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java?rev=892976&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRCheckAdaptor.java Mon Dec 21 21:05:17 2009
@@ -0,0 +1,93 @@
+package org.apache.hadoop.chukwa.datacollection.adaptor.filetailer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.connector.ChunkCatcherConnector;
+import org.apache.hadoop.conf.Configuration;
+import junit.framework.TestCase;
+import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
+import org.apache.log4j.Level;
+
+public class TestRCheckAdaptor extends TestCase {
+  
+  ChunkCatcherConnector chunks;
+
+  public TestRCheckAdaptor() {
+    chunks = new ChunkCatcherConnector();
+    chunks.start();
+  }
+
+  public void testLogRotate() throws IOException, InterruptedException,
+      ChukwaAgent.AlreadyRunningException {
+    Configuration conf = new Configuration();
+    conf.set("chukwaAgent.control.port", "0");
+    conf.setInt("chukwaAgent.adaptor.context.switch.time", 100);
+        
+//    RCheckFTAdaptor.log.setLevel(Level.DEBUG);
+    File baseDir = new File(System.getProperty("test.build.data", "/tmp") + "/rcheck");
+    TestDirTailingAdaptor.createEmptyDir(baseDir);
+    File tmpOutput = new File(baseDir, "rotateTest.1");
+    PrintWriter pw = new PrintWriter(new FileOutputStream(tmpOutput));
+    pw.println("First");
+    pw.close();
+    Thread.sleep(1000);//to make sure mod dates are distinguishing.
+    tmpOutput = new File(baseDir, "rotateTest");
+    pw = new PrintWriter(new FileOutputStream(tmpOutput));
+    pw.println("Second");
+    pw.close();
+    
+    
+    ChukwaAgent agent = new ChukwaAgent(conf);
+    String adaptorID = agent.processAddCommand("add lr = filetailer.RCheckFTAdaptor test " + tmpOutput.getAbsolutePath() + " 0");
+    assertNotNull(adaptorID);
+    
+    Chunk c = chunks.waitForAChunk(2000);
+    assertNotNull(c);
+    assertTrue(c.getData().length == 6);
+    assertTrue("First\n".equals(new String(c.getData())));
+    c = chunks.waitForAChunk(2000);
+    assertNotNull(c);
+    assertTrue(c.getData().length == 7);    
+    assertTrue("Second\n".equals(new String(c.getData())));
+
+    pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+    pw.println("Third");
+    pw.close();
+    c = chunks.waitForAChunk(2000);
+    
+    assertNotNull(c);
+    assertTrue(c.getData().length == 6);    
+    assertTrue("Third\n".equals(new String(c.getData())));
+    Thread.sleep(1500);
+    
+    tmpOutput.renameTo(new File(baseDir, "rotateTest.2"));
+    pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+    pw.println("Fourth");
+    pw.close();
+    c = chunks.waitForAChunk(2000);
+
+    assertNotNull(c);
+    System.out.println("got " + new String(c.getData()));
+    assertTrue("Fourth\n".equals(new String(c.getData())));
+
+    Thread.sleep(1500);
+    
+    tmpOutput.renameTo(new File(baseDir, "rotateTest.3"));
+    Thread.sleep(400);
+    pw = new PrintWriter(new FileOutputStream(tmpOutput, true));
+    pw.println("Fifth");
+    pw.close();
+    c = chunks.waitForAChunk(2000);
+    assertNotNull(c);
+    System.out.println("got " + new String(c.getData()));
+    assertTrue("Fifth\n".equals(new String(c.getData())));
+
+    agent.shutdown();
+    Thread.sleep(2000);
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java?rev=892976&r1=892975&r2=892976&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/TestRawAdaptor.java Mon Dec 21 21:05:17 2009
@@ -50,6 +50,12 @@
     System.out.println("testing lightweight fta");
     runTest("LWFTAdaptor"); 
   }
+  
+
+  public void testRotAdaptor() throws Exception {
+    System.out.println("testing lightweight fta");
+    runTest("LWFTAdaptor"); 
+  }
 
   public void runTest(String name) throws IOException, InterruptedException,
       ChukwaAgent.AlreadyRunningException {