You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2008/12/10 07:48:29 UTC
svn commit: r725010 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
Author: cdouglas
Date: Tue Dec 9 22:48:28 2008
New Revision: 725010
URL: http://svn.apache.org/viewvc?rev=725010&view=rev
Log:
HADOOP-4805. Remove black list collector from Chukwa Agent HTTP Sender. Contributed by Eric Yang.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=725010&r1=725009&r2=725010&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Dec 9 22:48:28 2008
@@ -329,6 +329,9 @@
HADOOP-4708. Add binaries missed in the initial checkin for Chukwa. (Eric
Yang via cdouglas)
+ HADOOP-4805. Remove black list collector from Chukwa Agent HTTP Sender.
+ (Eric Yang via cdouglas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=725010&r1=725009&r2=725010&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Tue Dec 9 22:48:28 2008
@@ -26,10 +26,8 @@
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
-import java.util.Date;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
@@ -41,7 +39,6 @@
import org.apache.commons.httpclient.methods.RequestEntity;
import org.apache.commons.httpclient.params.HttpMethodParams;
import org.apache.hadoop.chukwa.Chunk;
-import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.log4j.Logger;
@@ -50,8 +47,7 @@
* Encapsulates all of the http setup and connection details needed for
* chunks to be delivered to a collector.
* <p>
- * On error, tries the list of available collectors, pauses for a minute,
- * and then repeats.
+ * On error, tries the list of available collectors, pauses for a minute, and then repeats.
* </p>
* <p> Will wait forever for collectors to come up. </p>
*/
@@ -60,13 +56,13 @@
static final int SENDER_RETRIES = 14440;
static final int WAIT_FOR_COLLECTOR_REBOOT = 20 * 1000;
//FIXME: this should really correspond to the timer in RetryListOfCollectors
- static final int BLACK_LIST_TIME = 300 * 1000;
+
static Logger log = Logger.getLogger(ChukwaHttpSender.class);
static HttpClient client = null;
static MultiThreadedHttpConnectionManager connectionManager = null;
static String currCollector = null;
- protected static ConcurrentHashMap<Long, String> blackList = null;
+
protected Iterator<String> collectors;
static
@@ -119,11 +115,10 @@
}
public ChukwaHttpSender(){
+ //setup default collector
ArrayList<String> tmp = new ArrayList<String>();
this.collectors = tmp.iterator();
- ConcurrentHashMap<Long, String> tmpHash = new ConcurrentHashMap<Long, String>();
- this.blackList = tmpHash;
- log.info("setting collectors to an empty iterator");
+ log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: " + collectors.hasNext());
}
@@ -139,22 +134,21 @@
* @param collectors
*/
public void setCollectors(Iterator<String> collectors){
- this.collectors = collectors;
- this.blackList.clear();
- //setup a new destination from our list of collectors if one isn't set up
+ this.collectors = collectors;
+ //setup a new destination from our list of collectors if one hasn't been set up
if (currCollector == null){
if (collectors.hasNext()){
currCollector = collectors.next();
}
else
- log.error("No collectors to try in setCollectors()");
+ log.error("No collectors to try in send(), not even trying to do doPost()");
}
}
/**
- * grab all of the chunks currently in the chunkQueue, stores a copy of them
- * locally, calculates their size, sets them up
+ * grab all of the chunks currently in the chunkQueue, stores a copy of them locally, calculates
+ * their size, sets them up
* @return array of chunk id's which were ACKed by collector
*/
public List<CommitListEntry> send(List<Chunk> toSend) throws InterruptedException, IOException{
@@ -188,29 +182,14 @@
//need to pick a destination here
PostMethod method = new PostMethod();
try {
- if(blackList.size()!=0) {
- for(long time: blackList.keySet()) {
- long now = new Date().getTime();
- if(now-time > BLACK_LIST_TIME) {
- log.info(blackList.get(time)+" release from black list.");
- blackList.remove(time);
- } else if(currCollector.intern()==blackList.get(time)) {
- currCollector = collectors.next();
- }
- }
- }
doPost(method, postData, currCollector);
- // rotate POST to collectors do not work. All agent and collectors end up spending time to create TCP connections
- // but unable to send any data.
- // currCollector = collectors.next();
+
retries = SENDER_RETRIES; //reset count on success
//if no exception was thrown from doPost, ACK that these chunks were sent
return commitResults;
} catch (Throwable e) {
log.error("Http post exception", e);
log.info("Checking list of collectors to see if another collector has been specified for rollover");
- blackList.put(new Date().getTime(), currCollector);
- log.info("Black list collector: "+currCollector);
if (collectors.hasNext()){
currCollector = collectors.next();
log.info("Found a new collector to roll over to, retrying HTTP Post to collector " + currCollector);
@@ -220,9 +199,6 @@
" ms (" + retries + "retries left)");
Thread.sleep(WAIT_FOR_COLLECTOR_REBOOT);
retries --;
- // shuffle the list of collectors if all of them are not available.
- this.collectors = DataFactory.getInstance().getCollectorURLs();
- this.blackList.clear();
} else {
log.error("No more collectors to try rolling over to; aborting");
throw new IOException("no collectors");
@@ -234,11 +210,6 @@
method.releaseConnection();
}
} //end retry loop
- if(currCollector==null) {
- // reset the collector list, if ran out of collector to try.
- this.collectors = DataFactory.getInstance().getCollectorURLs();
- this.blackList.clear();
- }
return new ArrayList<CommitListEntry>();
}