You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/10/07 22:25:49 UTC
svn commit: r822896 - in /hadoop/chukwa/trunk: ./ contrib/chukwa-pig/
contrib/xtrace/ src/java/org/apache/hadoop/chukwa/datacollection/adaptor/
src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/
src/java/org/apache/hadoop/chukwa/datac...
Author: asrabkin
Date: Wed Oct 7 20:25:44 2009
New Revision: 822896
URL: http://svn.apache.org/viewvc?rev=822896&view=rev
Log:
CHUKWA-390. Improvements to asynchronous acknowledgement mechanism.
Added:
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/contrib/chukwa-pig/ (props changed)
hadoop/chukwa/trunk/contrib/xtrace/ (props changed)
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Wed Oct 7 20:25:44 2009
@@ -58,6 +58,8 @@
IMPROVEMENTS
+ CHUKWA-390. Improvements to asynchronous acknowledgement mechanism. (asrabkin)
+
CHUKWA-397. Allow "all" as search pattern. (asrabkin)
CHUKWA-393. Support using pig on Chunks. (asrabkin)
Propchange: hadoop/chukwa/trunk/contrib/chukwa-pig/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Oct 7 20:25:44 2009
@@ -0,0 +1 @@
+chukwa-pig.jar
Propchange: hadoop/chukwa/trunk/contrib/xtrace/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Oct 7 20:25:44 2009
@@ -0,0 +1 @@
+chukwa-xtrace.jar
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/Adaptor.java Wed Oct 7 20:25:44 2009
@@ -37,6 +37,13 @@
* with k < n, it is allowed to send different values for bytes k through n the
* second time around. However, the stream must still be parseable, assuming
* that bytes 0-k come from the first run,and bytes k - n come from the second.
+ *
+ * Note that Adaptor implements neither equals() nor hashCode(). It is never
+ * safe to compare two adaptors with equals(). It is safe to use adaptors
+ * as hash table keys, though two distinct Adaptors will appear as two distinct
+ * keys. This is the desired behavior, since it means that messages intended
+ * for one Adaptor will never be received by another, even across Adaptor
+ * restarts.
*/
public interface Adaptor {
/**
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/DirTailingAdaptor.java Wed Oct 7 20:25:44 2009
@@ -31,6 +31,9 @@
*
* Mandatory first parameter is a directory. Mandatory second parameter
* is the name of an adaptor to start.
+ *
+ * If the specified directory does not exist, the DirTailer will continue
+ * running, and will start tailing if the directory is later created.
*
*/
public class DirTailingAdaptor extends AbstractAdaptor implements Runnable {
@@ -45,8 +48,6 @@
long scanInterval;
String adaptorName; //name of adaptors to start
-
-
static Pattern cmd = Pattern.compile("(.+)\\s+(\\S+)");
@Override
public void start(long offset) throws AdaptorException {
@@ -69,10 +70,10 @@
scanDirHierarchy(baseDir);
lastSweepStartTime=sweepStartTime;
control.reportCommit(this, lastSweepStartTime);
- Thread.sleep(scanInterval);
} catch(IOException e) {
log.warn(e);
}
+ Thread.sleep(scanInterval);
}
} catch(InterruptedException e) {
}
@@ -82,6 +83,8 @@
* Coded recursively. Base case is a single non-dir file.
*/
private void scanDirHierarchy(File dir) throws IOException {
+ if(!dir.exists())
+ return;
if(!dir.isDirectory() ) {
//Don't start tailing if we would have gotten it on the last pass
if(dir.lastModified() >= lastSweepStartTime) {
@@ -128,7 +131,6 @@
public long shutdown(AdaptorShutdownPolicy shutdownPolicy)
throws AdaptorException {
continueScanning = false;
-
return lastSweepStartTime;
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java Wed Oct 7 20:25:44 2009
@@ -34,6 +34,10 @@
* A base class for file tailing adaptors.
* Intended to mandate as little policy as possible, and to use as
* few system resources as possible.
+ *
+ *
+ * If the file does not exist, this class will continue to retry quietly
+ * forever and will start tailing if it's eventually created.
*/
public class LWFTAdaptor extends AbstractAdaptor {
@@ -41,6 +45,7 @@
* This is the maximum amount we'll read from any one file before moving on to
* the next. This way, we get quick response time for other files if one file
* is growing rapidly.
+ *
*/
public static final int DEFAULT_MAX_READ_SIZE = 128 * 1024;
public static final String MAX_READ_SIZE_OPT =
@@ -200,11 +205,15 @@
throws InterruptedException {
boolean hasMoreData = false;
try {
+
+ //if file doesn't exist, length =0 and we just keep waiting for it.
+ //if(!toWatch.exists())
+ // deregisterAndStop(false);
+
long len = toWatch.length();
if(len < fileReadOffset) {
//file shrank; probably some data went missing.
handleShrunkenFile(len);
-
} else if(len > fileReadOffset) {
RandomAccessFile reader = new RandomAccessFile(toWatch, "r");
slurp(len, reader);
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java Wed Oct 7 20:25:44 2009
@@ -21,6 +21,7 @@
import org.apache.hadoop.conf.*;
import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
@@ -28,7 +29,9 @@
public class AdaptorResetThread extends Thread {
- Logger log = Logger.getLogger(AdaptorResetThread.class);
+ static Logger log = Logger.getLogger(AdaptorResetThread.class);
+ public static final String TIMEOUT_OPT = "connector.commitpoll.timeout";
+
int resetCount = 0;
private static class AdaptorStat {
@@ -40,24 +43,34 @@
}
}
+
+ int timeout = 15*60 * 1000; //default to wait fifteen minutes for an ack
+ //note that this is overridden using the poll and rotate periods.
+
Map<Adaptor, AdaptorStat> status;
- int timeout = 10*60 * 1000; //default to wait ten minutes for an ack
ChukwaAgent agent;
- public static final String TIMEOUT_OPT = "connector.commitpoll.timeout";
private volatile boolean running = true;
public AdaptorResetThread(Configuration conf, ChukwaAgent a) {
- timeout = 2* conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/2);
- //default to 2x rotation interval, if rotate interval is defined.
- timeout = conf.getInt(TIMEOUT_OPT, timeout);
- //or explicitly set timeout
+ //
+ timeout = conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/3)
+ + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/3)
+ + conf.getInt(CommitCheckServlet.SCANPERIOD_OPT, timeout/3);
+
+ timeout = conf.getInt(TIMEOUT_OPT, timeout); //unless overridden
+
status = new LinkedHashMap<Adaptor, AdaptorStat>();
this.agent = a;
this.setDaemon(true);
}
- public void resetTimedOutAdaptors(int timeSinceLastCommit) {
-
+ /**
+ * Resets all adaptors with outstanding data more than timeSinceLastCommit old.
+ * @param timeSinceLastCommit
+ * @return the number of reset adaptors
+ */
+ public int resetTimedOutAdaptors(int timeSinceLastCommit) {
+ int resetThisTime = 0;
long timeoutThresh = System.currentTimeMillis() - timeSinceLastCommit;
List<Adaptor> toResetList = new ArrayList<Adaptor>(); //also contains stopped
//adaptors
@@ -67,45 +80,55 @@
ChukwaAgent.Offset off = agent.offset(ent.getKey());
if(off == null) {
toResetList.add(ent.getKey());
- } else if(stat.maxByteSent > off.offset
- && stat.lastCommitTime < timeoutThresh) {
+ } else if(stat.maxByteSent > off.offset //some data outstanding
+ && stat.lastCommitTime < timeoutThresh) { //but no progress made
toResetList.add(ent.getKey());
- log.warn("restarting " + off.id + " at " + off.offset + " due to collector timeout");
+ log.warn("restarting " + off.id + " at " + off.offset + " due to timeout; "+
+ "last commit was ");
}
}
}
for(Adaptor a: toResetList) {
- status.remove(a);
+ status.remove(a); //it'll get added again when adaptor resumes, if it does
ChukwaAgent.Offset off = agent.offset(a);
if(off != null) {
agent.stopAdaptor(off.id, false);
- //We can do this safely if we're called in the same thread as the sends,
- //since then we'll be synchronous with sends, and guaranteed to be
- //interleaved between two successive sends
- //DataFactory.getInstance().getEventQueue().purgeAdaptor(a);
-
String a_status = a.getCurrentStatus();
agent.processAddCommand("add " + off.id + "= " + a.getClass().getCanonicalName()
+ " "+ a_status + " " + off.offset);
- resetCount ++;
+ resetThisTime ++;
//will be implicitly added to table once adaptor starts sending
}
- //implicitly do nothing if adaptor was stopped
+ //implicitly do nothing if adaptor was stopped. We already removed
+ //its entry from the status table.
}
+ resetCount += resetThisTime;
+ return resetThisTime;
}
public synchronized void reportPending(List<AsyncAckSender.CommitListEntry> delayedCommits) {
+ long now = System.currentTimeMillis();
for(AsyncAckSender.CommitListEntry dc: delayedCommits) {
AdaptorStat a = status.get(dc.adaptor);
if(a == null)
- status.put(dc.adaptor, new AdaptorStat(0, dc.uuid));
+ status.put(dc.adaptor, new AdaptorStat(now, dc.uuid));
else if(a.maxByteSent < dc.uuid)
a.maxByteSent = dc.uuid;
}
}
+ public synchronized void reportCommits(Set<Adaptor> commits) {
+ long now = System.currentTimeMillis();
+ for(Adaptor a: commits) {
+ if(status.containsKey(a)) {
+ status.get(a).lastCommitTime = now;
+ } else
+ log.warn("saw commit for adaptor " + a + " before seeing sends");
+ }
+ }
+
public void reportStop(Adaptor a) {
status.remove(a);
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Wed Oct 7 20:25:44 2009
@@ -43,6 +43,7 @@
import org.apache.hadoop.chukwa.datacollection.agent.metrics.AgentMetrics;
import org.apache.hadoop.chukwa.datacollection.connector.Connector;
import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry;
import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
import org.apache.hadoop.chukwa.util.AdaptorNamingUtils;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
@@ -70,7 +71,7 @@
Connector connector = null;
// doesn't need an equals(), comparator, etc
- static class Offset {
+ public static class Offset {
public Offset(long l, String id) {
offset = l;
this.id = id;
@@ -78,6 +79,9 @@
final String id;
volatile long offset;
+ public long offset() {
+ return this.offset;
+ }
}
public static class AlreadyRunningException extends Exception {
@@ -571,7 +575,7 @@
}
}
- Offset offset(Adaptor a) {
+ public Offset offset(Adaptor a) {
Offset o = adaptorPositions.get(a);
return o;
}
@@ -646,19 +650,16 @@
}
/**
- * Returns the last offset at which a given adaptor was checkpointed
- *
- * @param a the adaptor in question
- * @return that adaptor's last-checkpointed offset
- */
- long getOffset(Adaptor a) { //FIXME: do we need this method?
- return adaptorPositions.get(a).offset;
- }
-
- /**
* Returns the control socket for this agent.
*/
private AgentControlSocketListener getControlSock() {
return controlSock;
}
+
+ public String getAdaptorName(Adaptor initiator) {
+ Offset o = adaptorPositions.get(initiator);
+ if(o != null)
+ return o.id;
+ else return null;
+ }
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java Wed Oct 7 20:25:44 2009
@@ -28,6 +28,7 @@
import org.apache.log4j.Logger;
import java.util.*;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT;
import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver;
import org.apache.hadoop.conf.Configuration;
@@ -129,7 +130,7 @@
oldEntries = new PriorityQueue<PurgeTask>();
checkInterval = conf.getInt(SCANPERIOD_OPT, checkInterval);
- String sinkPath = conf.get("chukwaCollector.outputDir", "/chukwa/logs");
+ String sinkPath = conf.get(SeqFileWriter.OUTPUT_DIR_OPT, "/chukwa/logs");
pathsToSearch.add(new Path(sinkPath));
String additionalSearchPaths = conf.get(SCANPATHS_OPT, "");
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java Wed Oct 7 20:25:44 2009
@@ -58,28 +58,39 @@
*/
public static class DelayedCommit extends CommitListEntry implements Comparable<DelayedCommit> {
final String fname;
- final long offset;
- public DelayedCommit(Adaptor a, long uuid, String fname, long offset) {
- super(a, uuid);
+ long fOffset;
+ final String aName;
+ public DelayedCommit(Adaptor a, long uuid, long len, String fname,
+ long offset, String aName) {
+ super(a, uuid, len);
this.fname = fname;
- this.offset = offset;
+ this.fOffset = offset;
+ this.aName = aName;
}
@Override
public int hashCode() {
- return super.hashCode() ^ fname.hashCode() ^ (int)(offset) ^ (int) (offset >> 32);
+ return super.hashCode() ^ fname.hashCode() ^ (int)(fOffset) ^ (int) (fOffset >> 32);
}
+ //sort by adaptor name first, then by start offset
+ //note that returning 1 means this is "greater" than RHS
public int compareTo(DelayedCommit o) {
- if(o.uuid < uuid)
+ int c = o.aName.compareTo(this.aName);
+ if(c != 0)
+ return c;
+ c = fname.compareTo(this.fname);
+ if(c != 0)
+ return c;
+ if(o.start < start)
return 1;
- else if(o.uuid > uuid)
+ else if(o.start > start)
return -1;
else return 0;
}
public String toString() {
- return adaptor +" commits up to " + uuid + " when " + fname + " hits " + offset;
+ return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset;
}
}
@@ -88,35 +99,29 @@
final ChukwaAgent agent;
/*
- * The merge table stores commits that we're expecting, before they're handed
- * to the CommitPollThread. There will be only one entry for each adaptor.
+ * The list of commits that we're expecting.
+ * This is the structure used to pass the list to the CommitPollThread.
+ * Adjacent commits to the same file will be coalesced.
*
- * values are a collection of delayed commits, one per adaptor.
- * keys are unspecified
*/
- final Map<String, DelayedCommit> mergeTable;
+ final List<DelayedCommit> mergedList;
/**
* Periodically scans a subset of the collectors, looking for committed files.
* This way, not every collector is pestering the namenode with periodic lses.
*/
- static final class CommitPollThread extends Thread {
+ final class CommitPollThread extends Thread {
private ChukwaHttpSender scanPath;
private int pollPeriod = 1000 * 30;
private final Map<String, PriorityQueue<DelayedCommit>> pendingCommits;
- private final Map<String, DelayedCommit> mergeTable;
- private final ChukwaAgent agent;
- CommitPollThread(Configuration conf, ChukwaAgent agent,
- Map<String, DelayedCommit> mergeTable, Iterator<String> tryList) {
+ CommitPollThread(Configuration conf, Iterator<String> tryList) {
pollPeriod = conf.getInt(POLLPERIOD_OPT, pollPeriod);
scanPath = new ChukwaHttpSender(conf);
scanPath.setCollectors(tryList);
pendingCommits = new HashMap<String, PriorityQueue<DelayedCommit>>();
- this.mergeTable = mergeTable;
- this.agent = agent;
}
private volatile boolean running = true;
@@ -144,18 +149,18 @@
* from the same thread that will later check for commits
*/
private void mergePendingTable() {
- synchronized(mergeTable) {
- for(DelayedCommit dc: mergeTable.values()) {
+ synchronized(mergedList) {
+ for(DelayedCommit dc:mergedList) {
- PriorityQueue<DelayedCommit> map = pendingCommits.get(dc.fname);
- if(map == null) {
- map = new PriorityQueue<DelayedCommit>();
- pendingCommits.put(dc.fname, map);
+ PriorityQueue<DelayedCommit> pendList = pendingCommits.get(dc.fname);
+ if(pendList == null) {
+ pendList = new PriorityQueue<DelayedCommit>();
+ pendingCommits.put(dc.fname, pendList);
}
- map.add(dc);
+ pendList.add(dc);
}
- mergeTable.clear();
- }
+ mergedList.clear();
+ } //end synchronized
}
Pattern respLine = Pattern.compile("<li>(.*) ([0-9]+)</li>");
@@ -177,24 +182,29 @@
if(delayedOnFile == null)
continue;
+ HashSet<Adaptor> committed = new HashSet<Adaptor>();
while(!delayedOnFile.isEmpty()) {
DelayedCommit fired = delayedOnFile.element();
- if(fired.offset > committedOffset)
+ if(fired.fOffset > committedOffset)
break;
- else
+ else {
+ ChukwaAgent.Offset o = agent.offset(fired.adaptor);
+ if(o != null && fired.start > o.offset()) {
+ log.error("can't commit without ordering assumption");
+ break; //don't commit
+ }
delayedOnFile.remove();
- String s = agent.reportCommit(fired.adaptor, fired.uuid);
- //TODO: if s == null, then the adaptor has been stopped.
- //should we stop sending acks?
- log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
+ String s = agent.reportCommit(fired.adaptor, fired.uuid);
+ committed.add(fired.adaptor);
+ //TODO: if s == null, then the adaptor has been stopped.
+ //should we stop sending acks?
+ log.info("COMMIT to "+ committedOffset+ " on "+ path+ ", updating " +s);
+ }
}
+ adaptorReset.reportCommits(committed);
}
}
- void setScannableCollectors(Iterator<String> collectorURLs) {
- // TODO Auto-generated method stub
-
- }
}
CommitPollThread pollThread;
@@ -208,9 +218,10 @@
log.info("delayed-commit processing enabled");
agent = a;
- mergeTable = new LinkedHashMap<String, DelayedCommit>();
+ mergedList = new ArrayList<DelayedCommit>();
this.conf = conf;
adaptorReset = new AdaptorResetThread(conf, a);
+ adaptorReset.start();
//initialize the commitpoll later, once we have the list of collectors
}
@@ -239,7 +250,7 @@
tryList = l.iterator();
}
- pollThread = new CommitPollThread(conf, agent, mergeTable, tryList);
+ pollThread = new CommitPollThread(conf, tryList);
pollThread.setDaemon(true);
pollThread.start();
}
@@ -252,19 +263,24 @@
* read by the CommitPollThread when it figures out what commits are expected.
*/
private void delayCommits(List<DelayedCommit> delayed) {
- String[] keys = new String[delayed.size()];
- int i = 0;
- for(DelayedCommit c: delayed) {
- String adaptorKey = c.adaptor.hashCode() + "_" + c.adaptor.getCurrentStatus().hashCode();
- keys[i++] = c.fname +"::" + adaptorKey;
- }
- synchronized(mergeTable) {
- for(i = 0; i < keys.length; ++i) {
- DelayedCommit cand = delayed.get(i);
- DelayedCommit cur = mergeTable.get(keys[i]);
- if(cur == null || cand.offset > cur.offset)
- mergeTable.put(keys[i], cand);
+ Collections.sort(delayed);
+
+ synchronized(mergedList) {
+ DelayedCommit region =null;
+ for(DelayedCommit cur: delayed) {
+ if(region == null)
+ region = cur;
+ else if((cur.adaptor == region.adaptor) &&
+ cur.fname.equals(region.fname) && (cur.start <= region.uuid)) {
+ //since the list is sorted, region.start < cur.start
+ region.uuid = Math.max(region.uuid, cur.uuid); //merge
+ region.fOffset = Math.max(region.fOffset, cur.fOffset);
+ } else {
+ mergedList.add(region);
+ region= cur;
+ }
}
+ mergedList.add(region);
}
}
@@ -276,8 +292,10 @@
throws IOException, InterruptedException {
adaptorReset.reportPending(expectedCommitResults);
List<String> resp = reliablySend(method, ServletCollector.PATH);
- List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>();
+ //expect most of 'em to be delayed
+ List<DelayedCommit> toDelay = new ArrayList<DelayedCommit>(resp.size());
ArrayList<CommitListEntry> result = new ArrayList<CommitListEntry>();
+
for(int i = 0; i < resp.size(); ++i) {
if(resp.get(i).startsWith(ServletCollector.ACK_PREFIX))
result.add(expectedCommitResults.get(i));
@@ -288,8 +306,11 @@
log.warn("unexpected response: "+ resp.get(i));
else
log.info("waiting for " + m.group(1) + " to hit " + m.group(2) + " before committing "+ cle.adaptor);
- toDelay.add(new DelayedCommit(cle.adaptor, cle.uuid, m.group(1),
- Long.parseLong(m.group(2))));
+
+ String name = agent.getAdaptorName(cle.adaptor);
+ if(name != null)//null name implies adaptor no longer running
+ toDelay.add(new DelayedCommit(cle.adaptor, cle.uuid, cle.start, m.group(1),
+ Long.parseLong(m.group(2)), name));
}
}
delayCommits(toDelay);
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java Wed Oct 7 20:25:44 2009
@@ -89,10 +89,11 @@
public static class CommitListEntry {
public Adaptor adaptor;
public long uuid;
-
- public CommitListEntry(Adaptor a, long uuid) {
+ public long start; //how many bytes of stream
+ public CommitListEntry(Adaptor a, long uuid, long start) {
adaptor = a;
this.uuid = uuid;
+ this.start = start;
}
}
@@ -183,7 +184,8 @@
// store a CLE for this chunk which we will use to ack this chunk to the
// caller of send()
// (e.g. the agent will use the list of CLE's for checkpointing)
- commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID()));
+ commitResults.add(new CommitListEntry(c.getInitiator(), c.getSeqID(),
+ c.getSeqID() - c.getData().length));
}
toSend.clear();
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java Wed Oct 7 20:25:44 2009
@@ -53,6 +53,7 @@
public static final String STAT_PERIOD_OPT = "chukwaCollector.stats.period";
public static final String ROTATE_INTERVAL_OPT = "chukwaCollector.rotateInterval";
+ public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir";
static String localHostAddr = null;
final Semaphore lock = new Semaphore(1, true);
@@ -94,7 +95,7 @@
}
public void init(Configuration conf) throws WriterException {
- outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
+ outputDir = conf.get(OUTPUT_DIR_OPT, "/chukwa");
this.conf = conf;
Added: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java?rev=822896&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java (added)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestAdaptorTimeout.java Wed Oct 7 20:25:44 2009
@@ -0,0 +1,55 @@
+package org.apache.hadoop.chukwa.datacollection.collector;
+import java.io.File;
+import org.apache.hadoop.chukwa.datacollection.adaptor.TestDirTailingAdaptor;
+import org.apache.hadoop.chukwa.datacollection.agent.AdaptorResetThread;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet;
+import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
+import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender;
+import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
+import org.apache.hadoop.chukwa.util.ConstRateAdaptor;
+import org.apache.hadoop.conf.Configuration;
+import org.mortbay.jetty.Server;
+import junit.framework.TestCase;
+
+
+
+public class TestAdaptorTimeout extends TestCase {
+ static final int PORTNO = 9997;
+ static final int TEST_DURATION_SECS = 30;
+ static int SEND_RATE = 10* 1000; //bytes/sec
+
+ public void testAdaptorTimeout() throws Exception {
+ Configuration conf = new Configuration();
+
+ String outputDirectory = TestDelayedAcks.buildConf(conf);
+ conf.setInt(AdaptorResetThread.TIMEOUT_OPT, 1000);
+ ServletCollector collector = new ServletCollector(conf);
+ Server collectorServ = TestDelayedAcks.startCollectorOnPort(conf, PORTNO, collector);
+ Thread.sleep(1000);
+
+ ChukwaAgent agent = new ChukwaAgent(conf);
+ HttpConnector conn = new HttpConnector(agent, "http://localhost:"+PORTNO+"/");
+ conn.start();
+ String resp = agent.processAddCommand("add constSend = " + ConstRateAdaptor.class.getCanonicalName() +
+ " testData "+ SEND_RATE + " 0");
+ assertTrue("constSend".equals(resp));
+ Thread.sleep(TEST_DURATION_SECS * 1000);
+
+ AsyncAckSender sender = (AsyncAckSender)conn.getSender();
+ int resets = sender.adaptorReset.getResetCount();
+ System.out.println(resets + " resets");
+ assertTrue(resets > 0);
+
+ agent.shutdown();
+ collectorServ.stop();
+ conn.shutdown();
+ Thread.sleep(5000); //for collector to shut down
+
+ long dups = TestFailedCollectorAck.checkDirs(conf, conf.get(SeqFileWriter.OUTPUT_DIR_OPT));
+ assertTrue(dups > 0);
+ TestDirTailingAdaptor.nukeDirContents(new File(outputDirectory));
+ }
+
+}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestDelayedAcks.java Wed Oct 7 20:25:44 2009
@@ -84,16 +84,17 @@
Chunk c1 = chunks.waitForAChunk();
assertNotNull(c1);
List<CommitListEntry> pendingAcks = new ArrayList<CommitListEntry>();
- pendingAcks.add(new DelayedCommit(c1.getInitiator(), c1.getSeqID(), "foo", c1.getSeqID()));
+ pendingAcks.add(new DelayedCommit(c1.getInitiator(), c1.getSeqID(),
+ c1.getData().length, "foo", c1.getSeqID(), agent.getAdaptorName(c1.getInitiator())));
restart.reportPending(pendingAcks);
assertEquals(len, c1.getData().length);
Thread.sleep(ACK_TIMEOUT*2);
- restart.resetTimedOutAdaptors(ACK_TIMEOUT);
+ int resetCount = restart.resetTimedOutAdaptors(ACK_TIMEOUT);
Chunk c2 = chunks.waitForAChunk(1000);
assertNotNull(c2);
assertEquals(len, c2.getData().length);
- assertTrue(restart.getResetCount() > 0);
+ assertTrue(resetCount > 0);
agent.shutdown();
//start an adaptor -- chunks should appear in the connector
//wait for timeout. More chunks should appear.
@@ -119,7 +120,7 @@
String outputDirectory = tempDir.getPath() + "/test_DA" + System.currentTimeMillis();
String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
- conf.set("chukwaCollector.outputDir", seqWriterOutputDir );
+ conf.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
writer.init(conf);
ArrayList<Chunk> oneChunk = new ArrayList<Chunk>();
@@ -178,7 +179,7 @@
conf.setInt("chukwaCollector.rotateInterval", ROTATEPERIOD);
conf.set("writer.hdfs.filesystem", "file:///");
String seqWriterOutputDir = outputDirectory +"/chukwa_sink";
- conf.set("chukwaCollector.outputDir", seqWriterOutputDir );
+ conf.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
conf.setInt(AsyncAckSender.POLLPERIOD_OPT, CLIENT_SCANPERIOD);
conf.setInt(CommitCheckServlet.SCANPERIOD_OPT, SERVER_SCANPERIOD);
conf.setBoolean(HttpConnector.ASYNC_ACKS_OPT, true);
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestFailedCollectorAck.java Wed Oct 7 20:25:44 2009
@@ -41,9 +41,9 @@
sinkB.mkdir();
conf.set(CommitCheckServlet.SCANPATHS_OPT, sinkA.getCanonicalPath()
+ "," + sinkB.getCanonicalPath());
- conf.set("chukwaCollector.outputDir", sinkA.getCanonicalPath() );
+ conf.set(SeqFileWriter.OUTPUT_DIR_OPT, sinkA.getCanonicalPath() );
ServletCollector collector1 = new ServletCollector(new Configuration(conf));
- conf.set("chukwaCollector.outputDir",sinkB.getCanonicalPath() );
+ conf.set(SeqFileWriter.OUTPUT_DIR_OPT,sinkB.getCanonicalPath() );
ServletCollector collector2 = new ServletCollector(conf);
Server collector1_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+1, collector1);
Server collector2_s = TestDelayedAcks.startCollectorOnPort(conf, PORTNO+2, collector2);
@@ -86,7 +86,8 @@
}
}
- public void checkDirs(Configuration conf, String paths) throws IOException {
+ //returns number of dup chunks
+ public static long checkDirs(Configuration conf, String paths) throws IOException {
ArrayList<Path> toScan = new ArrayList<Path>();
ArrayList<ByteRange> bytes = new ArrayList<ByteRange>();
@@ -100,9 +101,7 @@
for(Path p: toScan) {
FileStatus[] dataSinkFiles = localfs.listStatus(p, SinkArchiver.DATA_SINK_FILTER);
- for(FileStatus fstatus: dataSinkFiles) {
- System.out.println(fstatus.getPath().getName());
- }
+
for(FileStatus fstatus: dataSinkFiles) {
if(!fstatus.getPath().getName().endsWith(".done"))
continue;
@@ -129,6 +128,7 @@
System.out.println(s);
}
assertEquals(0, sm.missingBytes);
+ return sm.dupBytes;
}
}
Modified: hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java?rev=822896&r1=822895&r2=822896&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java (original)
+++ hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaWriters.java Wed Oct 7 20:25:44 2009
@@ -80,7 +80,7 @@
confSeqWriter.set("chukwaCollector.rotateInterval", "300000");
confSeqWriter.set("writer.hdfs.filesystem", "file:///");
String seqWriterOutputDir = outputDirectory +"/seqWriter/seqOutputDir";
- confSeqWriter.set("chukwaCollector.outputDir", seqWriterOutputDir );
+ confSeqWriter.set(SeqFileWriter.OUTPUT_DIR_OPT, seqWriterOutputDir );
seqWriter.init(confSeqWriter);
Thread.sleep(5000);