You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/08/28 17:38:18 UTC

svn commit: r808922 - in /hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection: adaptor/FileAdaptor.java sender/ChukwaHttpSender.java sender/ChukwaSender.java sender/RetryListOfCollectors.java test/FileTailerStressTest.java

Author: asrabkin
Date: Fri Aug 28 15:38:17 2009
New Revision: 808922

URL: http://svn.apache.org/viewvc?rev=808922&view=rev
Log:
CHUKWA-379. More refactoring

Modified:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java Fri Aug 28 15:38:17 2009
@@ -137,7 +137,7 @@
    */
   private long offsetOfFirstByte = 0;
 
-  public void start( long bytes) {
+  public void start(long bytes) {
     // in this case params = filename
     log.info("adaptor id: " + adaptorID + " started file adaptor on file "
         + toWatch);
@@ -147,8 +147,6 @@
         DEFAULT_TIMEOUT_PERIOD);
     this.timeOut = startTime + TIMEOUT_PERIOD;
     
-
-    
     tailer.addFileAdaptor(this);
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Fri Aug 28 15:38:17 2009
@@ -66,6 +66,9 @@
   final int MAX_RETRIES_PER_COLLECTOR; // fast retries, in http client
   final int SENDER_RETRIES;
   final int WAIT_FOR_COLLECTOR_REBOOT;
+  final int COLLECTOR_TIMEOUT;
+  
+  public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
   // FIXME: this should really correspond to the timer in RetryListOfCollectors
 
   static final HttpSenderMetrics metrics = new HttpSenderMetrics("ChukwaAgent", "chukwaHttpSender");
@@ -128,13 +131,12 @@
     // setup default collector
     ArrayList<String> tmp = new ArrayList<String>();
     this.collectors = tmp.iterator();
-    log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: "
-            + collectors.hasNext());
 
     MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
     SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
     WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
         20 * 1000);
+    COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
   }
 
   /**
@@ -150,7 +152,7 @@
       if (collectors.hasNext()) {
         currCollector = collectors.next();
       } else
-        log.error("No collectors to try in send(), not even trying to do doPost()");
+        log.error("No collectors to try in send(), won't even try to do doPost()");
     }
   }
 
@@ -226,10 +228,15 @@
         ChukwaHttpSender.metrics.httpThrowable.inc();
         if (collectors.hasNext()) {
           ChukwaHttpSender.metrics.collectorRollover.inc();
-          failedCollector(currCollector);
+          boolean repeatPost = failedCollector(currCollector);
           currCollector = collectors.next();
-          log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
-                  + currCollector);
+          if(repeatPost)
+            log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
+                + currCollector);
+          else {
+            log.info("Using " + currCollector + " in the future, but not retrying this post");
+            break;
+          }
         } else {
           if (retries > 0) {
             log.warn("No more collectors to try rolling over to; waiting "
@@ -252,10 +259,12 @@
 
   /**
    * A hook for taking action when a collector is declared failed.
+   * Returns whether to retry current post, or junk it
    * @param downCollector
    */
-  protected void failedCollector(String downCollector) {
+  protected boolean failedCollector(String downCollector) {
     log.debug("declaring "+ downCollector + " down");
+    return true;
   }
 
   /**
@@ -275,7 +284,7 @@
           }
         });
 
-    pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(30000));
+    pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));
 
     method.setParams(pars);
     method.setPath(dest);
@@ -318,4 +327,8 @@
     }
     return resp;
   }
+
+  @Override
+  public void stop() {
+  }
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java Fri Aug 28 15:38:17 2009
@@ -22,5 +22,7 @@
       throws InterruptedException, java.io.IOException;
 
   public void setCollectors(Iterator<String> collectors);
+  
+  public void stop();
 
 }

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java Fri Aug 28 15:38:17 2009
@@ -46,10 +46,7 @@
 
   public RetryListOfCollectors(File collectorFile, Configuration conf)
       throws IOException {
-    collectors = new ArrayList<String>();
-    this.conf = conf;
-    portNo = conf.get("chukwaCollector.http.port", "8080");
-    maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
+    this(conf);
     try {
       BufferedReader br = new BufferedReader(new FileReader(collectorFile));
       String line, parsedline;
@@ -101,16 +98,22 @@
    * @param maxRetryRateMs
    */
   public RetryListOfCollectors(final List<String> collectors, Configuration conf) {
+    this(conf);
+    this.collectors.addAll(collectors);
+    //we don't shuffle the list here -- this constructor is only used for test purposes
+  }
+  
+  public RetryListOfCollectors(Configuration conf) {
+    collectors = new ArrayList<String>();
+    this.conf = conf;
+    portNo = conf.get("chukwaCollector.http.port", "8080");
     maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
     lastLookAtFirstNode = 0;
-    this.collectors = new ArrayList<String>();
-    this.collectors.addAll(collectors);
-    shuffleList();
   }
 
   // for now, use a simple O(n^2) algorithm.
   // safe, because we only do this once, and on smallish lists
-  private void shuffleList() {
+  public void shuffleList() {
     ArrayList<String> newList = new ArrayList<String>();
     Random r = new java.util.Random();
     while (!collectors.isEmpty()) {
@@ -138,12 +141,8 @@
       return null;
   }
 
-  public String getRandomCollector() {
-    return collectors.get((int) java.lang.Math.random() * collectors.size());
-  }
-
-  public void add(URL collector) {
-    collectors.add(collector.toString());
+  public void add(String collector) {
+    collectors.add(collector);
   }
 
   public void remove() {

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java Fri Aug 28 15:38:17 2009
@@ -25,6 +25,7 @@
 import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
 import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
 import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter;
+import org.apache.hadoop.conf.Configuration;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.servlet.Context;
 import org.mortbay.jetty.servlet.ServletHolder;
@@ -72,9 +73,10 @@
       Server server = new Server(9990);
       Context root = new Context(server, "/", Context.SESSIONS);
 
-      ServletCollector.setWriter(new ConsoleWriter(true));
-      root.addServlet(new ServletHolder(new ServletCollector(
-          new ChukwaConfiguration(true))), "/*");
+      Configuration conf =  new Configuration();
+      ServletCollector collector = new ServletCollector(conf);
+      collector.setWriter(new ConsoleWriter(true));
+      root.addServlet(new ServletHolder(collector), "/*");
       server.start();
       server.setStopAtShutdown(false);