You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/08/28 17:38:18 UTC
svn commit: r808922 - in
/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection:
adaptor/FileAdaptor.java sender/ChukwaHttpSender.java
sender/ChukwaSender.java sender/RetryListOfCollectors.java
test/FileTailerStressTest.java
Author: asrabkin
Date: Fri Aug 28 15:38:17 2009
New Revision: 808922
URL: http://svn.apache.org/viewvc?rev=808922&view=rev
Log:
CHUKWA-379. More refactoring
Modified:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java Fri Aug 28 15:38:17 2009
@@ -137,7 +137,7 @@
*/
private long offsetOfFirstByte = 0;
- public void start( long bytes) {
+ public void start(long bytes) {
// in this case params = filename
log.info("adaptor id: " + adaptorID + " started file adaptor on file "
+ toWatch);
@@ -147,8 +147,6 @@
DEFAULT_TIMEOUT_PERIOD);
this.timeOut = startTime + TIMEOUT_PERIOD;
-
-
tailer.addFileAdaptor(this);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Fri Aug 28 15:38:17 2009
@@ -66,6 +66,9 @@
final int MAX_RETRIES_PER_COLLECTOR; // fast retries, in http client
final int SENDER_RETRIES;
final int WAIT_FOR_COLLECTOR_REBOOT;
+ final int COLLECTOR_TIMEOUT;
+
+ public static final String COLLECTOR_TIMEOUT_OPT = "chukwaAgent.sender.collectorTimeout";
// FIXME: this should really correspond to the timer in RetryListOfCollectors
static final HttpSenderMetrics metrics = new HttpSenderMetrics("ChukwaAgent", "chukwaHttpSender");
@@ -128,13 +131,12 @@
// setup default collector
ArrayList<String> tmp = new ArrayList<String>();
this.collectors = tmp.iterator();
- log.info("added a single collector to collector list in ConnectorClient constructor, it's hasNext is now: "
- + collectors.hasNext());
MAX_RETRIES_PER_COLLECTOR = c.getInt("chukwaAgent.sender.fastRetries", 4);
SENDER_RETRIES = c.getInt("chukwaAgent.sender.retries", 144000);
WAIT_FOR_COLLECTOR_REBOOT = c.getInt("chukwaAgent.sender.retryInterval",
20 * 1000);
+ COLLECTOR_TIMEOUT = c.getInt(COLLECTOR_TIMEOUT_OPT, 30*1000);
}
/**
@@ -150,7 +152,7 @@
if (collectors.hasNext()) {
currCollector = collectors.next();
} else
- log.error("No collectors to try in send(), not even trying to do doPost()");
+ log.error("No collectors to try in send(), won't even try to do doPost()");
}
}
@@ -226,10 +228,15 @@
ChukwaHttpSender.metrics.httpThrowable.inc();
if (collectors.hasNext()) {
ChukwaHttpSender.metrics.collectorRollover.inc();
- failedCollector(currCollector);
+ boolean repeatPost = failedCollector(currCollector);
currCollector = collectors.next();
- log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
- + currCollector);
+ if(repeatPost)
+ log.info("Found a new collector to roll over to, retrying HTTP Post to collector "
+ + currCollector);
+ else {
+ log.info("Using " + currCollector + " in the future, but not retrying this post");
+ break;
+ }
} else {
if (retries > 0) {
log.warn("No more collectors to try rolling over to; waiting "
@@ -252,10 +259,12 @@
/**
* A hook for taking action when a collector is declared failed.
+ * Returns whether to retry current post, or junk it
* @param downCollector
*/
- protected void failedCollector(String downCollector) {
+ protected boolean failedCollector(String downCollector) {
log.debug("declaring "+ downCollector + " down");
+ return true;
}
/**
@@ -275,7 +284,7 @@
}
});
- pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(30000));
+ pars.setParameter(HttpMethodParams.SO_TIMEOUT, new Integer(COLLECTOR_TIMEOUT));
method.setParams(pars);
method.setPath(dest);
@@ -318,4 +327,8 @@
}
return resp;
}
+
+ @Override
+ public void stop() {
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaSender.java Fri Aug 28 15:38:17 2009
@@ -22,5 +22,7 @@
throws InterruptedException, java.io.IOException;
public void setCollectors(Iterator<String> collectors);
+
+ public void stop();
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java Fri Aug 28 15:38:17 2009
@@ -46,10 +46,7 @@
public RetryListOfCollectors(File collectorFile, Configuration conf)
throws IOException {
- collectors = new ArrayList<String>();
- this.conf = conf;
- portNo = conf.get("chukwaCollector.http.port", "8080");
- maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
+ this(conf);
try {
BufferedReader br = new BufferedReader(new FileReader(collectorFile));
String line, parsedline;
@@ -101,16 +98,22 @@
* @param maxRetryRateMs
*/
public RetryListOfCollectors(final List<String> collectors, Configuration conf) {
+ this(conf);
+ this.collectors.addAll(collectors);
+ //we don't shuffle the list here -- this constructor is only used for test purposes
+ }
+
+ public RetryListOfCollectors(Configuration conf) {
+ collectors = new ArrayList<String>();
+ this.conf = conf;
+ portNo = conf.get("chukwaCollector.http.port", "8080");
maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000);
lastLookAtFirstNode = 0;
- this.collectors = new ArrayList<String>();
- this.collectors.addAll(collectors);
- shuffleList();
}
// for now, use a simple O(n^2) algorithm.
// safe, because we only do this once, and on smallish lists
- private void shuffleList() {
+ public void shuffleList() {
ArrayList<String> newList = new ArrayList<String>();
Random r = new java.util.Random();
while (!collectors.isEmpty()) {
@@ -138,12 +141,8 @@
return null;
}
- public String getRandomCollector() {
- return collectors.get((int) java.lang.Math.random() * collectors.size());
- }
-
- public void add(URL collector) {
- collectors.add(collector.toString());
+ public void add(String collector) {
+ collectors.add(collector);
}
public void remove() {
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java?rev=808922&r1=808921&r2=808922&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java Fri Aug 28 15:38:17 2009
@@ -25,6 +25,7 @@
import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter;
+import org.apache.hadoop.conf.Configuration;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
@@ -72,9 +73,10 @@
Server server = new Server(9990);
Context root = new Context(server, "/", Context.SESSIONS);
- ServletCollector.setWriter(new ConsoleWriter(true));
- root.addServlet(new ServletHolder(new ServletCollector(
- new ChukwaConfiguration(true))), "/*");
+ Configuration conf = new Configuration();
+ ServletCollector collector = new ServletCollector(conf);
+ collector.setWriter(new ConsoleWriter(true));
+ root.addServlet(new ServletHolder(collector), "/*");
server.start();
server.setStopAtShutdown(false);