You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/08/13 05:05:14 UTC
svn commit: r803761 - in /hadoop/chukwa/trunk: ./
src/java/org/apache/hadoop/chukwa/datacollection/agent/
src/java/org/apache/hadoop/chukwa/datacollection/connector/http/
src/java/org/apache/hadoop/chukwa/datacollection/writer/
src/java/org/apache/hado...
Author: asrabkin
Date: Thu Aug 13 03:05:13 2009
New Revision: 803761
URL: http://svn.apache.org/viewvc?rev=803761&view=rev
Log:
CHUKWA-373. Test code for backpressure.
Added:
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestCollector.java
- copied unchanged from r801506, hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
Removed:
hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
Modified:
hadoop/chukwa/trunk/CHANGES.txt
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Aug 13 03:05:13 2009
@@ -46,6 +46,8 @@
IMPROVEMENTS
+ CHUKWA-373. Test code for backpressure. (asrabkin)
+
CHUKWA-370. Exec adaptor should commit immediately. (asrabkin)
CHUKWA-367. Print metadata in DumpChunks. (asrabkin)
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Thu Aug 13 03:05:13 2009
@@ -53,6 +53,12 @@
synchronized (this) {
while (chunk.getData().length + dataSize > MAX_MEM_USAGE) {
try {
+ if(dataSize == 0) { //queue is empty, but data is still too big
+ log.error("JUMBO CHUNK SPOTTED: type= " + chunk.getDataType() +
+ " and source =" +chunk.getStreamName());
+ return; //return without sending; otherwise we'd deadlock.
+ //this error should probably be fatal; there's no way to recover.
+ }
metrics.fullQueue.set(1);
this.wait();
log.info("MemLimitQueue is full [" + dataSize + "]");
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Thu Aug 13 03:05:13 2009
@@ -54,11 +54,16 @@
static Logger log = Logger.getLogger(HttpConnector.class);
- static Timer statTimer = null;
- static volatile int chunkCount = 0;
- static final int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
- static final int MIN_POST_INTERVAL = 5 * 1000;
- static ChunkQueue chunkQueue;
+ Timer statTimer = null;
+ volatile int chunkCount = 0;
+
+ int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
+ int MIN_POST_INTERVAL = 5 * 1000;
+ public static final String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
+ public static final String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
+
+
+ ChunkQueue chunkQueue;
ChukwaAgent agent;
String argDestination = null;
@@ -68,9 +73,8 @@
private Iterator<String> collectors = null;
protected ChukwaSender connectorClient = null;
- static {
+ { //instance initializer block
statTimer = new Timer();
- chunkQueue = DataFactory.getInstance().getEventQueue();
statTimer.schedule(new TimerTask() {
public void run() {
int count = chunkCount;
@@ -93,6 +97,12 @@
}
public void start() {
+
+ chunkQueue = DataFactory.getInstance().getEventQueue();
+ MAX_SIZE_PER_POST = agent.getConfiguration().getInt(MAX_SIZE_PER_POST_OPT,
+ MAX_SIZE_PER_POST);
+ MIN_POST_INTERVAL = agent.getConfiguration().getInt(MIN_POST_INTERVAL_OPT,
+ MIN_POST_INTERVAL);
(new Thread(this, "HTTP post thread")).start();
}
@@ -168,8 +178,7 @@
Thread.sleep(now - lastPost); // wait for stuff to accumulate
lastPost = now;
} // end of try forever loop
- log
- .info("received stop() command so exiting run() loop to shutdown connector");
+ log.info("received stop() command so exiting run() loop to shutdown connector");
} catch (OutOfMemoryError e) {
log.warn("Bailing out", e);
DaemonWatcher.bailout(-1);
Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java?rev=803761&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java Thu Aug 13 03:05:13 2009
@@ -0,0 +1,42 @@
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.util.List;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Minimal writer; does nothing with data.
+ *
+ * Useful primarily as an end-of-pipeline stage, if stuff in the middle
+ * is accomplishing something useful.
+ *
+ */
+public class NullWriter implements ChukwaWriter {
+
+ //in kb per sec
+ int maxDataRate = Integer.MAX_VALUE;
+ public static final String RATE_OPT_NAME = "nullWriter.dataRate";
+ @Override
+ public void add(List<Chunk> chunks) throws WriterException {
+ try {
+ int dataBytes =0;
+ for(Chunk c: chunks)
+ dataBytes +=c.getData().length;
+ if(maxDataRate > 0)
+ Thread.sleep(dataBytes / maxDataRate);
+ } catch(Exception e) {}
+ return;
+ }
+
+ @Override
+ public void close() throws WriterException {
+ return;
+ }
+
+ @Override
+ public void init(Configuration c) throws WriterException {
+ maxDataRate = c.getInt(RATE_OPT_NAME, 0);
+ return;
+ }
+
+}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java Thu Aug 13 03:05:13 2009
@@ -50,7 +50,7 @@
* TeeWriter ---> Client (Chunk serialized as Writable)*
* An indefinite sequence of serialized chunks
*
- * In english: clients should connect and say either "RAW " or "WRITABLE "
+ * In English: clients should connect and say either "RAW " or "WRITABLE "
* followed by a filter. (Note that the keyword is followed by exactly one space.)
* They'll then receive either a sequence of byte arrays or of writable-serialized.
*
@@ -204,8 +204,10 @@
}
public void handle(Chunk c) {
+
+ //don't ever block; just ignore this chunk if we don't have room for it.
if(rules.matches(c))
- sendQ.add(c);
+ sendQ.offer(c);
}
}
Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Thu Aug 13 03:05:13 2009
@@ -20,16 +20,17 @@
import java.util.Random;
-import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.*;
import org.apache.hadoop.chukwa.datacollection.*;
import org.apache.hadoop.chukwa.datacollection.adaptor.*;
import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.apache.hadoop.conf.Configuration;
public class ConstRateAdaptor extends Thread implements Adaptor {
- private static final int SLEEP_VARIANCE = 200;
- private static final int MIN_SLEEP = 300;
+ private int SLEEP_VARIANCE = 200;
+ private int MIN_SLEEP = 300;
private String type;
private long offset;
@@ -51,6 +52,9 @@
this.type = type;
this.dest = dest;
this.setName("ConstRate Adaptor_" + type);
+ Configuration conf = c.getConfiguration();
+ MIN_SLEEP = conf.getInt("constAdaptor.minSleep", MIN_SLEEP);
+ SLEEP_VARIANCE = conf.getInt("constAdaptor.sleepVariance", SLEEP_VARIANCE);
super.start(); // this is a Thread.start
}
@@ -65,16 +69,17 @@
}
public void run() {
- Random r = new Random();
+ Random timeCoin = new Random();
try {
while (!stopping) {
- int MSToSleep = r.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; // between 1 and
+ int MSToSleep = timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; // between 1 and
// 3 secs
// FIXME: I think there's still a risk of integer overflow here
int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L);
byte[] data = new byte[arraySize];
- r.nextBytes(data);
+ Random dataPattern = new Random(offset);
offset += data.length;
+ dataPattern.nextBytes(data);
ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data,
this);
@@ -106,16 +111,28 @@
}
- @Override
- public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
-
- switch(shutdownPolicy) {
- case HARD_STOP :
- case GRACEFULLY :
- case WAIT_TILL_FINISHED :
- stopping = true;
- break;
- }
- return offset;
+ @Override
+ public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
+
+ switch(shutdownPolicy) {
+ case HARD_STOP :
+ case GRACEFULLY :
+ case WAIT_TILL_FINISHED :
+ stopping = true;
+ break;
}
+ return offset;
+ }
+
+ public static boolean checkChunk(Chunk chunk) {
+ byte[] data = chunk.getData();
+ byte[] correctData = new byte[data.length];
+ Random dataPattern = new Random(chunk.getSeqID());
+ dataPattern.nextBytes(correctData);
+ for(int i=0; i < data.length ; ++i)
+ if(data [i] != correctData[i])
+ return false;
+
+ return true;
+ }
}