You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/12/10 07:48:29 UTC

svn commit: r725010 - in /hadoop/core/trunk: CHANGES.txt src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java

Author: cdouglas
Date: Tue Dec  9 22:48:28 2008
New Revision: 725010

URL: http://svn.apache.org/viewvc?rev=725010&view=rev
Log:
HADOOP-4805. Remove black list collector from Chukwa Agent HTTP Sender. Contributed by Eric Yang.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=725010&r1=725009&r2=725010&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Dec  9 22:48:28 2008
@@ -329,6 +329,9 @@
     HADOOP-4708. Add binaries missed in the initial checkin for Chukwa. (Eric
     Yang via cdouglas)
 
+    HADOOP-4805. Remove black list collector from Chukwa Agent HTTP Sender.
+    (Eric Yang via cdouglas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=725010&r1=725009&r2=725010&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Tue Dec  9 22:48:28 2008
@@ -26,10 +26,8 @@
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpException;
@@ -41,7 +39,6 @@
 import org.apache.commons.httpclient.methods.RequestEntity;
 import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.hadoop.chukwa.Chunk;
-import org.apache.hadoop.chukwa.datacollection.DataFactory;
 import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.log4j.Logger;
@@ -50,8 +47,7 @@
  * Encapsulates all of the http setup and connection details needed for
  * chunks to be delivered to a collector.
  * <p>
- * On error, tries the list of available collectors, pauses for a minute, 
- * and then repeats.
+ * On error, tries the list of available collectors, pauses for a minute, and then repeats.
  * </p>
  * <p> Will wait forever for collectors to come up. </p>
  */
@@ -60,13 +56,13 @@
   static final int SENDER_RETRIES = 14440; 
   static final int WAIT_FOR_COLLECTOR_REBOOT = 20 * 1000; 
     //FIXME: this should really correspond to the timer in RetryListOfCollectors
-  static final int BLACK_LIST_TIME = 300 * 1000;
+  
   static Logger log = Logger.getLogger(ChukwaHttpSender.class);
   static HttpClient client = null;
   static MultiThreadedHttpConnectionManager connectionManager = null;
   static String currCollector = null;
 
-  protected static ConcurrentHashMap<Long, String> blackList = null; 
+  
   protected Iterator<String> collectors;
   
   static
@@ -119,11 +115,10 @@
   }
 
   public ChukwaHttpSender(){
+    //setup default collector
     ArrayList<String> tmp = new ArrayList<String>();
     this.collectors = tmp.iterator();
-    ConcurrentHashMap<Long, String> tmpHash = new ConcurrentHashMap<Long, String>();
-    this.blackList = tmpHash;
-    log.info("setting collectors to an empty iterator");
+    log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: " + collectors.hasNext());
 
   }
   
@@ -139,22 +134,21 @@
    * @param collectors
    */
   public void setCollectors(Iterator<String> collectors){
-    this.collectors = collectors;
-    this.blackList.clear();
-    //setup a new destination from our list of collectors if one isn't set up
+    this.collectors = collectors; 
+    //setup a new destination from our list of collectors if one hasn't been set up
     if (currCollector == null){
       if (collectors.hasNext()){
         currCollector = collectors.next();
       }
       else
-        log.error("No collectors to try in setCollectors()");
+        log.error("No collectors to try in send(), not even trying to do doPost()");
     }
   }
   
   
   /**
-   * grab all of the chunks currently in the chunkQueue, stores a copy of them 
-   * locally, calculates their size, sets them up 
+   * grab all of the chunks currently in the chunkQueue, stores a copy of them locally, calculates
+   * their size, sets them up 
    * @return array of chunk id's which were ACKed by collector
    */
   public List<CommitListEntry> send(List<Chunk> toSend) throws InterruptedException, IOException{
@@ -188,29 +182,14 @@
       //need to pick a destination here
       PostMethod method = new PostMethod();
       try   {
-    	if(blackList.size()!=0) {
-    		for(long time: blackList.keySet()) {
-    			long now = new Date().getTime();
-    			if(now-time > BLACK_LIST_TIME) {
-    	    		log.info(blackList.get(time)+" release from black list.");
-    				blackList.remove(time);
-    			} else if(currCollector.intern()==blackList.get(time)) {
-    				currCollector = collectors.next();
-    			}
-    		}
-    	}
         doPost(method, postData, currCollector);
-        // rotate POST to collectors do not work.  All agent and collectors end up spending time to create TCP connections
-        // but unable to send any data.
-        // currCollector = collectors.next();        
+
         retries = SENDER_RETRIES; //reset count on success
         //if no exception was thrown from doPost, ACK that these chunks were sent
         return commitResults;
       } catch (Throwable e) {
         log.error("Http post exception", e);
         log.info("Checking list of collectors to see if another collector has been specified for rollover");
-        blackList.put(new Date().getTime(), currCollector);
-        log.info("Black list collector: "+currCollector);
         if (collectors.hasNext()){
           currCollector = collectors.next();
           log.info("Found a new collector to roll over to, retrying HTTP Post to collector " + currCollector);
@@ -220,9 +199,6 @@
                 " ms (" + retries + "retries left)");
             Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
             retries --;
-            // shuffle the list of collectors if all of them are not available.
-            this.collectors = DataFactory.getInstance().getCollectorURLs();
-            this.blackList.clear();
           } else {
             log.error("No more collectors to try rolling over to; aborting");
             throw new IOException("no collectors");
@@ -234,11 +210,6 @@
         method.releaseConnection();
       }
     } //end retry loop
-    if(currCollector==null) {
-    	// reset the collector list, if ran out of collector to try.
-        this.collectors = DataFactory.getInstance().getCollectorURLs();
-        this.blackList.clear();    	
-    }
     return new ArrayList<CommitListEntry>();
   }