You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/10/07 22:25:49 UTC

svn commit: r822896 - in /hadoop/chukwa/trunk: ./ contrib/chukwa-pig/ contrib/xtrace/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/ src/java/org/apache/hadoop/chukwa/datac...

Author: asrabkin
Date: Wed Oct  7 20:25:44 2009
New Revision: 822896

URL: http://svn.apache.org/viewvc?rev=822896&view=rev
Log:
CHUKWA-390. Improvements to asynchronous acknowledgement mechanism.

Added:
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/contrib/chukwa-pig/   (props changed)
    hadoop/chukwa/trunk/contrib/xtrace/   (props changed)
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Wed Oct  7 20:25:44 2009
@@ -58,6 +58,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-390. Improvements to asynchronous acknowledgement mechanism. (asrabkin)
+
     CHUKWA-397. Allow "all" as search pattern. (asrabkin)
 
     CHUKWA-393. Support using pig on Chunks. (asrabkin)

Propchange: hadoop/chukwa/trunk/contrib/chukwa-pig/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Oct  7 20:25:44 2009
@@ -0,0 +1 @@
+chukwa-pig.jar

Propchange: hadoop/chukwa/trunk/contrib/xtrace/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Oct  7 20:25:44 2009
@@ -0,0 +1 @@
+chukwa-xtrace.jar

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Wed Oct  7 20:25:44 2009
@@ -37,6 +37,13 @@
  * with k < n, it is allowed to send different values for bytes k through n the
  * second time around. However, the stream must still be parseable, assuming
  * that bytes 0-k come from the first run,and bytes k - n come from the second.
+ * 
+ * Note that Adaptor implements neither equals() nor hashCode(). It is never
+ * safe to compare two adaptors with equals(). It is safe to use adaptors
+ * as hash table keys, though two distinct Adaptors will appear as two distinct
+ * keys.  This is the desired behavior, since it means that messages intended
+ * for one Adaptor will never be received by another, even across Adaptor
+ *  restarts.
  */
 public interface Adaptor {
   /**

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java Wed Oct  7 20:25:44 2009
@@ -31,6 +31,9 @@
  *  
  *  Mandatory first parameter is a directory.  Mandatory second parameter
  *  is the name of an adaptor to start.  
+ *  
+ *  If the specified directory does not exist, the DirTailer will continue
+ *  running, and will start tailing if the directory is later created.
  *
  */
 public class DirTailingAdaptor extends AbstractAdaptor implements Runnable {
@@ -45,8 +48,6 @@
   long scanInterval;
   String adaptorName; //name of adaptors to start
   
-  
-
   static Pattern cmd = Pattern.compile("(.+)\\s+(\\S+)");
   @Override
   public void start(long offset) throws AdaptorException {
@@ -69,10 +70,10 @@
           scanDirHierarchy(baseDir);
           lastSweepStartTime=sweepStartTime;
           control.reportCommit(this, lastSweepStartTime);
-          Thread.sleep(scanInterval);
         } catch(IOException e) {
           log.warn(e);
         }
+        Thread.sleep(scanInterval);
       }
     } catch(InterruptedException e) {
     }
@@ -82,6 +83,8 @@
    * Coded recursively.  Base case is a single non-dir file.
    */
   private void scanDirHierarchy(File dir) throws IOException {
+    if(!dir.exists())
+      return;
     if(!dir.isDirectory() ) {
       //Don't start tailing if we would have gotten it on the last pass 
       if(dir.lastModified() >= lastSweepStartTime) {
@@ -128,7 +131,6 @@
   public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
       throws AdaptorException {
     continueScanning = false;
-    
     return lastSweepStartTime;
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java Wed Oct  7 20:25:44 2009
@@ -34,6 +34,10 @@
  * A base class for file tailing adaptors.  
  * Intended to mandate as little policy as possible, and to use as 
  * few system resources as possible.
+ * 
+ * 
+ * If the file does not exist, this class will continue to retry quietly
+ * forever and will start tailing if it's eventually created.
  */
 public class LWFTAdaptor extends AbstractAdaptor {
   
@@ -41,6 +45,7 @@
    * This is the maximum amount we'll read from any one file before moving on to
    * the next. This way, we get quick response time for other files if one file
    * is growing rapidly.
+   * 
    */
   public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
   public static final String MAX_READ_SIZE_OPT = 
@@ -200,11 +205,15 @@
   throws InterruptedException {
     boolean hasMoreData = false;
     try {
+      
+       //if file doesn't exist, length =0 and we just keep waiting for it.
+      //if(!toWatch.exists())
+      //  deregisterAndStop(false);
+      
       long len = toWatch.length();
       if(len < fileReadOffset) {
         //file shrank; probably some data went missing.
         handleShrunkenFile(len);
-        
       } else if(len > fileReadOffset) {
         RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
         slurp(len, reader);

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java Wed Oct  7 20:25:44 2009
@@ -21,6 +21,7 @@
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
 import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
 import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
 import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
@@ -28,7 +29,9 @@
 
 public class AdaptorResetThread extends Thread {
   
-  Logger log = Logger.getLogger(AdaptorResetThread.class);
+  static Logger log = Logger.getLogger(AdaptorResetThread.class);
+  public static final String TIMEOUT_OPT = "connector.commitpoll.timeout";
+  
   
   int resetCount = 0;
   private static class AdaptorStat {
@@ -40,24 +43,34 @@
     }
   }
 
+
+  int timeout = 15*60 * 1000; //default to wait fifteen minutes for an ack
+               //note that this is overridden using the poll and rotate periods.
+  
   Map<Adaptor, AdaptorStat> status;
-  int timeout = 10*60 * 1000; //default to wait ten minutes for an ack
   ChukwaAgent agent;
-  public static final String TIMEOUT_OPT = "connector.commitpoll.timeout";
   private volatile boolean running = true;
   
   public AdaptorResetThread(Configuration conf, ChukwaAgent a) {
-    timeout = 2* conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/2);
-      //default to 2x rotation interval, if rotate interval is defined.
-    timeout = conf.getInt(TIMEOUT_OPT, timeout);
-      //or explicitly set timeout
+        //
+    timeout =  conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/3) 
+        + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/3) 
+        + conf.getInt(CommitCheckServlet.SCANPERIOD_OPT, timeout/3);
+    
+    timeout = conf.getInt(TIMEOUT_OPT, timeout); //unless overridden
+     
     status = new LinkedHashMap<Adaptor, AdaptorStat>();
     this.agent = a;
     this.setDaemon(true);
   }
   
-  public void resetTimedOutAdaptors(int timeSinceLastCommit) {
-    
+  /**
+   * Resets all adaptors with outstanding data more than timeSinceLastCommit old.
+   * @param timeSinceLastCommit
+   * @return the number of reset adaptors
+   */
+  public int resetTimedOutAdaptors(int timeSinceLastCommit) {
+    int resetThisTime = 0;
     long timeoutThresh = System.currentTimeMillis() - timeSinceLastCommit;
     List<Adaptor> toResetList = new ArrayList<Adaptor>(); //also contains stopped 
     //adaptors
@@ -67,45 +80,55 @@
         ChukwaAgent.Offset off = agent.offset(ent.getKey());
         if(off == null) {
           toResetList.add(ent.getKey());
-        } else if(stat.maxByteSent > off.offset 
-            && stat.lastCommitTime < timeoutThresh) {
+        } else if(stat.maxByteSent > off.offset  //some data outstanding
+            && stat.lastCommitTime < timeoutThresh) { //but no progress made
           toResetList.add(ent.getKey());
-          log.warn("restarting " + off.id + " at " + off.offset + " due to collector timeout");
+          log.warn("restarting " + off.id + " at " + off.offset + " due to timeout; "+
+              "last commit was ");
         }
       }
     }
     
     for(Adaptor a: toResetList) {
-      status.remove(a);
+      status.remove(a); //it'll get added again when adaptor resumes, if it does
       ChukwaAgent.Offset off = agent.offset(a);
       if(off != null) {
         agent.stopAdaptor(off.id, false);
         
-          //We can do this safely if we're called in the same thread as the sends,
-        //since then we'll be synchronous with sends, and guaranteed to be
-        //interleaved between two successive sends
-        //DataFactory.getInstance().getEventQueue().purgeAdaptor(a);
-        
         String a_status = a.getCurrentStatus();
         agent.processAddCommand("add " + off.id + "= " + a.getClass().getCanonicalName()
              + " "+ a_status + " " + off.offset);
-        resetCount ++;
+        resetThisTime ++;
         //will be implicitly added to table once adaptor starts sending
       } 
-       //implicitly do nothing if adaptor was stopped
+       //implicitly do nothing if adaptor was stopped. We already removed
+      //its entry from the status table.
     }
+    resetCount += resetThisTime;
+    return resetThisTime;
   }
   
   public synchronized void reportPending(List<AsyncAckSender.CommitListEntry> delayedCommits) {
+    long now = System.currentTimeMillis();
     for(AsyncAckSender.CommitListEntry dc: delayedCommits) {
       AdaptorStat a = status.get(dc.adaptor);
       if(a == null)
-        status.put(dc.adaptor, new AdaptorStat(0, dc.uuid));
+        status.put(dc.adaptor, new AdaptorStat(now, dc.uuid));
       else if(a.maxByteSent < dc.uuid)
           a.maxByteSent = dc.uuid;
     }
   }
   
+  public synchronized void reportCommits(Set<Adaptor> commits) {
+    long now = System.currentTimeMillis();
+    for(Adaptor a: commits) {
+      if(status.containsKey(a)) {
+        status.get(a).lastCommitTime = now;
+      } else
+        log.warn("saw commit for adaptor " + a + " before seeing sends"); 
+    }
+  }
+  
   public void reportStop(Adaptor a) {
     status.remove(a);
   }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Wed Oct  7 20:25:44 2009
@@ -43,6 +43,7 @@
 import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
 import org.apache.hadoop.chukwa.datacollection.connector.Connector;
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
 import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
 import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
 import org.apache.hadoop.chukwa.util.DaemonWatcher;
@@ -70,7 +71,7 @@
   Connector connector = null;
 
   // doesn't need an equals(), comparator, etc
-  static class Offset {
+  public static class Offset {
     public Offset(long l, String id) {
       offset = l;
       this.id = id;
@@ -78,6 +79,9 @@
 
     final String id;
     volatile long offset;
+    public long offset() {
+      return this.offset;
+    }
   }
 
   public static class AlreadyRunningException extends Exception {
@@ -571,7 +575,7 @@
     }
   }
 
-  Offset offset(Adaptor a) {
+  public Offset offset(Adaptor a) {
     Offset o = adaptorPositions.get(a);
     return o;
   }
@@ -646,19 +650,16 @@
   }
 
   /**
-   * Returns the last offset at which a given adaptor was checkpointed
-   * 
-   * @param a the adaptor in question
-   * @return that adaptor's last-checkpointed offset
-   */
-  long getOffset(Adaptor a) { //FIXME: do we need this method?
-    return adaptorPositions.get(a).offset;
-  }
-
-  /**
    * Returns the control socket for this agent.
    */
   private AgentControlSocketListener getControlSock() {
     return controlSock;
   }
+
+  public String getAdaptorName(Adaptor initiator) {
+    Offset o = adaptorPositions.get(initiator);
+    if(o != null)
+      return o.id;
+    else return null;
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java Wed Oct  7 20:25:44 2009
@@ -28,6 +28,7 @@
 import org.apache.log4j.Logger;
 import java.util.*;
 import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
 import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
 import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
 import org.apache.hadoop.conf.Configuration;
@@ -129,7 +130,7 @@
       oldEntries = new PriorityQueue<PurgeTask>();
       checkInterval = conf.getInt(SCANPERIOD_OPT, checkInterval);
       
-      String sinkPath = conf.get("chukwaCollector.outputDir", "/chukwa/logs");
+      String sinkPath = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa/logs");
       pathsToSearch.add(new Path(sinkPath));
       
       String additionalSearchPaths = conf.get(SCANPATHS_OPT, "");

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java Wed Oct  7 20:25:44 2009
@@ -58,28 +58,39 @@
    */
   public static class DelayedCommit extends CommitListEntry implements Comparable<DelayedCommit> {
     final String fname;
-    final long offset;
-    public DelayedCommit(Adaptor a, long uuid, String fname, long offset) {
-      super(a, uuid);
+    long fOffset;
+    final String aName;
+    public DelayedCommit(Adaptor a, long uuid, long len, String fname, 
+        long offset, String aName) {
+      super(a, uuid, len);
       this.fname = fname;
-      this.offset = offset;
+      this.fOffset = offset;
+      this.aName = aName;
     }
 
     @Override
     public int hashCode() {
-      return super.hashCode() ^ fname.hashCode() ^ (int)(offset) ^ (int) (offset >> 32);
+      return super.hashCode() ^ fname.hashCode() ^ (int)(fOffset) ^ (int) (fOffset >> 32);
     }
     
+    //sort by adaptor name first, then by start offset
+    //note that returning 1 means this is "greater" than RHS
     public int compareTo(DelayedCommit o) {
-      if(o.uuid < uuid)
+      int c = o.aName.compareTo(this.aName);
+      if(c != 0)
+        return c;
+      c = fname.compareTo(this.fname);
+      if(c != 0)
+        return c;
+      if(o.start < start)
         return 1;
-      else if(o.uuid > uuid)
+      else if(o.start > start)
         return -1;
       else return 0;
     }
     
     public String toString() {
-      return adaptor +" commits up to " + uuid + " when " + fname + " hits " + offset;
+      return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
     }
   }
   
@@ -88,35 +99,29 @@
   final ChukwaAgent agent;
   
   /*
-   * The merge table stores commits that we're expecting, before they're handed
-   * to the CommitPollThread.  There will be only one entry for each adaptor.
+   * The list of commits that we're expecting.
+   * This is the structure used to pass the list to the CommitPollThread.  
+   * Adjacent commits to the same file will be coalesced.
    * 
-   * values are a collection of delayed commits, one per adaptor.
-   * keys are unspecified
    */
-  final Map<String, DelayedCommit> mergeTable;
+  final List<DelayedCommit> mergedList;
   
   /**
    * Periodically scans a subset of the collectors, looking for committed files.
    * This way, not every collector is pestering the namenode with periodic lses.
    */
-  static final class CommitPollThread extends Thread {
+  final class CommitPollThread extends Thread {
     private ChukwaHttpSender scanPath;
     private int pollPeriod = 1000 * 30;
 
 
     private final Map<String, PriorityQueue<DelayedCommit>> pendingCommits;
-    private final Map<String, DelayedCommit> mergeTable;
-    private final ChukwaAgent agent;
 
-    CommitPollThread(Configuration conf, ChukwaAgent agent, 
-        Map<String, DelayedCommit> mergeTable, Iterator<String> tryList) {
+    CommitPollThread(Configuration conf, Iterator<String> tryList) {
       pollPeriod = conf.getInt(POLLPERIOD_OPT, pollPeriod);
       scanPath = new ChukwaHttpSender(conf);
       scanPath.setCollectors(tryList);
       pendingCommits = new HashMap<String, PriorityQueue<DelayedCommit>>();
-      this.mergeTable = mergeTable;
-      this.agent = agent;
     }
 
     private volatile boolean running = true;
@@ -144,18 +149,18 @@
      * from the same thread that will later check for commits
      */
     private void mergePendingTable() {
-      synchronized(mergeTable) {
-        for(DelayedCommit dc: mergeTable.values()) {
+      synchronized(mergedList) {
+        for(DelayedCommit dc:mergedList) {
           
-          PriorityQueue<DelayedCommit> map = pendingCommits.get(dc.fname);
-          if(map == null) {
-            map = new PriorityQueue<DelayedCommit>();
-            pendingCommits.put(dc.fname, map);
+          PriorityQueue<DelayedCommit> pendList = pendingCommits.get(dc.fname);
+          if(pendList == null) {
+            pendList = new PriorityQueue<DelayedCommit>();
+            pendingCommits.put(dc.fname, pendList);
           }
-          map.add(dc);
+          pendList.add(dc);
         }
-        mergeTable.clear();
-      }
+        mergedList.clear();
+      } //end synchronized
     }
     
     Pattern respLine = Pattern.compile("<li>(.*) ([0-9]+)</li>");
@@ -177,24 +182,29 @@
         if(delayedOnFile == null)
           continue;
        
+        HashSet<Adaptor> committed = new HashSet<Adaptor>();
         while(!delayedOnFile.isEmpty()) {
           DelayedCommit fired = delayedOnFile.element();
-          if(fired.offset > committedOffset)
+          if(fired.fOffset > committedOffset)
             break;
-          else
+          else {
+            ChukwaAgent.Offset o = agent.offset(fired.adaptor);
+            if(o != null && fired.start > o.offset()) {
+              log.error("can't commit without ordering assumption");
+              break; //don't commit
+            }
             delayedOnFile.remove();
-          String s = agent.reportCommit(fired.adaptor, fired.uuid);
-          //TODO: if s == null, then the adaptor has been stopped.
-          //should we stop sending acks?
-          log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
+            String s = agent.reportCommit(fired.adaptor, fired.uuid);
+            committed.add(fired.adaptor);
+            //TODO: if s == null, then the adaptor has been stopped.
+            //should we stop sending acks?
+            log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
+          }
         }
+        adaptorReset.reportCommits(committed);
       }
     }
 
-    void setScannableCollectors(Iterator<String> collectorURLs) {
-      // TODO Auto-generated method stub
-      
-    }
   } 
   
   CommitPollThread pollThread;
@@ -208,9 +218,10 @@
     log.info("delayed-commit processing enabled");
     agent = a;
     
-    mergeTable = new LinkedHashMap<String, DelayedCommit>();
+    mergedList = new ArrayList<DelayedCommit>();
     this.conf = conf;
     adaptorReset = new AdaptorResetThread(conf, a);
+    adaptorReset.start();
     //initialize the commitpoll later, once we have the list of collectors
   }
   
@@ -239,7 +250,7 @@
        tryList = l.iterator();
    }
 
-   pollThread = new CommitPollThread(conf, agent, mergeTable, tryList);
+   pollThread = new CommitPollThread(conf, tryList);
    pollThread.setDaemon(true);
    pollThread.start();
   }
@@ -252,19 +263,24 @@
    *  read by the CommitPollThread when it figures out what commits are expected.
    */
   private void delayCommits(List<DelayedCommit> delayed) {
-    String[] keys = new String[delayed.size()];
-    int i = 0;
-    for(DelayedCommit c: delayed) {
-      String adaptorKey = c.adaptor.hashCode() + "_" + c.adaptor.getCurrentStatus().hashCode();
-      keys[i++] = c.fname +"::" + adaptorKey;
-    }
-    synchronized(mergeTable) {
-      for(i = 0; i < keys.length; ++i) {
-        DelayedCommit cand = delayed.get(i);
-        DelayedCommit cur = mergeTable.get(keys[i]);
-        if(cur == null || cand.offset > cur.offset) 
-          mergeTable.put(keys[i], cand);
+    Collections.sort(delayed);
+    
+    synchronized(mergedList) {
+      DelayedCommit region =null;
+      for(DelayedCommit cur: delayed) {
+        if(region == null)
+          region = cur;
+        else if((cur.adaptor == region.adaptor) &&
+            cur.fname.equals(region.fname) && (cur.start <= region.uuid)) {
+          //since the list is sorted, region.start < cur.start
+          region.uuid = Math.max(region.uuid, cur.uuid); //merge
+          region.fOffset = Math.max(region.fOffset, cur.fOffset);
+        } else {
+          mergedList.add(region);
+          region= cur;
+        }
       }
+      mergedList.add(region);
     }
   }
   
@@ -276,8 +292,10 @@
   throws IOException, InterruptedException {
     adaptorReset.reportPending(expectedCommitResults);
     List<String> resp = reliablySend(method, ServletCollector.PATH);
-    List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>();
+      //expect most of 'em to be delayed
+    List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>(resp.size());
     ArrayList<CommitListEntry> result =  new ArrayList<CommitListEntry>();
+    
     for(int i = 0; i < resp.size(); ++i)  {
       if(resp.get(i).startsWith(ServletCollector.ACK_PREFIX))
         result.add(expectedCommitResults.get(i));
@@ -288,8 +306,11 @@
           log.warn("unexpected response: "+ resp.get(i));
         else
           log.info("waiting for " + m.group(1) + " to hit " + m.group(2) + " before committing "+ cle.adaptor);
-        toDelay.add(new DelayedCommit(cle.adaptor, cle.uuid, m.group(1), 
-            Long.parseLong(m.group(2))));
+        
+        String name = agent.getAdaptorName(cle.adaptor);
+        if(name != null)//null name implies adaptor no longer running
+          toDelay.add(new DelayedCommit(cle.adaptor, cle.uuid, cle.start, m.group(1), 
+                Long.parseLong(m.group(2)), name));
       }
     }
     delayCommits(toDelay);

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Wed Oct  7 20:25:44 2009
@@ -89,10 +89,11 @@
   public static class CommitListEntry {
     public Adaptor adaptor;
     public long uuid;
-
-    public CommitListEntry(Adaptor a, long uuid) {
+    public long start; //how many bytes of stream
+    public CommitListEntry(Adaptor a, long uuid, long start) {
       adaptor = a;
       this.uuid = uuid;
+      this.start = start;
     }
   }
 
@@ -183,7 +184,8 @@
       // store a CLE for this chunk which we will use to ack this chunk to the
       // caller of send()
       // (e.g. the agent will use the list of CLE's for checkpointing)
-      commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID()));
+      commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(), 
+         c.getSeqID() - c.getData().length));
     }
     toSend.clear();
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Wed Oct  7 20:25:44 2009
@@ -53,6 +53,7 @@
   
   public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
   public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
+  public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
   static String localHostAddr = null;
   
   final Semaphore lock = new Semaphore(1, true);
@@ -94,7 +95,7 @@
   }
   
   public void init(Configuration conf) throws WriterException {
-    outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
+    outputDir = conf.get(OUTPUT_DIR_OPT, "/chukwa");
 
     this.conf = conf;
 

Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java?rev=822896&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java Wed Oct  7 20:25:44 2009
@@ -0,0 +1,55 @@
+package org.apache.hadoop.chukwa.datacollection.collector;
+import java.io.File;
+import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorResetThread;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
+import org.apache.hadoop.conf.Configuration;
+import org.mortbay.jetty.Server;
+import junit.framework.TestCase;
+
+
+
+public class TestAdaptorTimeout extends TestCase {
+  static final int PORTNO = 9997;
+  static final int TEST_DURATION_SECS = 30;
+  static int SEND_RATE = 10* 1000; //bytes/sec
+
+  public void testAdaptorTimeout() throws Exception {
+    Configuration conf = new Configuration();
+
+    String outputDirectory = TestDelayedAcks.buildConf(conf);
+    conf.setInt(AdaptorResetThread.TIMEOUT_OPT, 1000);
+    ServletCollector collector = new ServletCollector(conf);
+    Server collectorServ = TestDelayedAcks.startCollectorOnPort(conf, PORTNO, collector);
+    Thread.sleep(1000);
+    
+    ChukwaAgent agent = new ChukwaAgent(conf);
+    HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
+    conn.start();
+    String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() + 
+        " testData "+ SEND_RATE + " 0");
+    assertTrue("constSend".equals(resp));
+    Thread.sleep(TEST_DURATION_SECS * 1000);
+    
+    AsyncAckSender sender = (AsyncAckSender)conn.getSender();
+    int resets = sender.adaptorReset.getResetCount();
+    System.out.println(resets + " resets");
+    assertTrue(resets > 0);
+    
+    agent.shutdown();
+    collectorServ.stop();
+    conn.shutdown();
+    Thread.sleep(5000); //for collector to shut down
+    
+    long dups = TestFailedCollectorAck.checkDirs(conf, conf.get(SeqFileWriter.OUTPUT_DIR_OPT));
+    assertTrue(dups > 0);
+    TestDirTailingAdaptor.nukeDirContents(new File(outputDirectory));
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java Wed Oct  7 20:25:44 2009
@@ -84,16 +84,17 @@
     Chunk c1 = chunks.waitForAChunk();
     assertNotNull(c1);
     List<CommitListEntry> pendingAcks = new ArrayList<CommitListEntry>();
-    pendingAcks.add(new DelayedCommit(c1.getInitiator(), c1.getSeqID(), "foo", c1.getSeqID()));
+    pendingAcks.add(new DelayedCommit(c1.getInitiator(), c1.getSeqID(),
+        c1.getData().length, "foo", c1.getSeqID(), agent.getAdaptorName(c1.getInitiator())));
     restart.reportPending(pendingAcks);
 
     assertEquals(len, c1.getData().length);
     Thread.sleep(ACK_TIMEOUT*2);
-    restart.resetTimedOutAdaptors(ACK_TIMEOUT);
+    int resetCount = restart.resetTimedOutAdaptors(ACK_TIMEOUT);
     Chunk c2 = chunks.waitForAChunk(1000);
     assertNotNull(c2);
     assertEquals(len, c2.getData().length);
-    assertTrue(restart.getResetCount() > 0);
+    assertTrue(resetCount > 0);
     agent.shutdown();
 //start an adaptor -- chunks should appear in the connector
     //wait for timeout.  More chunks should appear.
@@ -119,7 +120,7 @@
     String outputDirectory = tempDir.getPath() + "/test_DA" + System.currentTimeMillis();
 
     String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
-    conf.set("chukwaCollector.outputDir", seqWriterOutputDir );
+    conf.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
 
     writer.init(conf);
     ArrayList<Chunk> oneChunk = new ArrayList<Chunk>();
@@ -178,7 +179,7 @@
     conf.setInt("chukwaCollector.rotateInterval", ROTATEPERIOD);
     conf.set("writer.hdfs.filesystem", "file:///");
     String seqWriterOutputDir = outputDirectory +"/chukwa_sink";
-    conf.set("chukwaCollector.outputDir", seqWriterOutputDir );
+    conf.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
     conf.setInt(AsyncAckSender.POLLPERIOD_OPT, CLIENT_SCANPERIOD);
     conf.setInt(CommitCheckServlet.SCANPERIOD_OPT, SERVER_SCANPERIOD);
     conf.setBoolean(HttpConnector.ASYNC_ACKS_OPT, true);

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java Wed Oct  7 20:25:44 2009
@@ -41,9 +41,9 @@
     sinkB.mkdir();
     conf.set(CommitCheckServlet.SCANPATHS_OPT, sinkA.getCanonicalPath()
         + "," + sinkB.getCanonicalPath());
-    conf.set("chukwaCollector.outputDir", sinkA.getCanonicalPath() );
+    conf.set(SeqFileWriter.OUTPUT_DIR_OPT, sinkA.getCanonicalPath() );
     ServletCollector collector1 = new ServletCollector(new Configuration(conf));
-    conf.set("chukwaCollector.outputDir",sinkB.getCanonicalPath() );
+    conf.set(SeqFileWriter.OUTPUT_DIR_OPT,sinkB.getCanonicalPath() );
     ServletCollector collector2 = new ServletCollector(conf);
     Server collector1_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+1, collector1);
     Server collector2_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+2, collector2);
@@ -86,7 +86,8 @@
     }
   }
   
-  public void checkDirs(Configuration conf, String paths) throws IOException {
+  //returns number of dup chunks
+  public static long checkDirs(Configuration conf, String paths) throws IOException {
     
     ArrayList<Path> toScan = new ArrayList<Path>();
     ArrayList<ByteRange> bytes = new ArrayList<ByteRange>();
@@ -100,9 +101,7 @@
     for(Path p: toScan) {
       
       FileStatus[] dataSinkFiles = localfs.listStatus(p, SinkArchiver.DATA_SINK_FILTER);
-      for(FileStatus fstatus: dataSinkFiles) {
-        System.out.println(fstatus.getPath().getName());
-      }
+   
       for(FileStatus fstatus: dataSinkFiles) {
         if(!fstatus.getPath().getName().endsWith(".done"))
           continue;
@@ -129,6 +128,7 @@
         System.out.println(s);
     }
     assertEquals(0, sm.missingBytes);
+    return sm.dupBytes;
   }
 
 }

Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java Wed Oct  7 20:25:44 2009
@@ -80,7 +80,7 @@
       confSeqWriter.set("chukwaCollector.rotateInterval", "300000");
       confSeqWriter.set("writer.hdfs.filesystem", "file:///");
       String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
-      confSeqWriter.set("chukwaCollector.outputDir", seqWriterOutputDir );
+      confSeqWriter.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
       
       seqWriter.init(confSeqWriter);
       Thread.sleep(5000);