You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by as...@apache.org on 2009/08/13 05:05:14 UTC

svn commit: r803761 - in /hadoop/chukwa/trunk: ./ src/java/org/apache/hadoop/chukwa/datacollection/agent/ src/java/org/apache/hadoop/chukwa/datacollection/connector/http/ src/java/org/apache/hadoop/chukwa/datacollection/writer/ src/java/org/apache/hado...

Author: asrabkin
Date: Thu Aug 13 03:05:13 2009
New Revision: 803761

URL: http://svn.apache.org/viewvc?rev=803761&view=rev
Log:
CHUKWA-373.  Test code for backpressure.

Added:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/TestCollector.java
      - copied unchanged from r801506, hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
Removed:
    hadoop/chukwa/trunk/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
Modified:
    hadoop/chukwa/trunk/CHANGES.txt
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java

Modified: hadoop/chukwa/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/CHANGES.txt?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/CHANGES.txt (original)
+++ hadoop/chukwa/trunk/CHANGES.txt Thu Aug 13 03:05:13 2009
@@ -46,6 +46,8 @@
 
   IMPROVEMENTS
 
+    CHUKWA-373.  Test code for backpressure. (asrabkin)
+
     CHUKWA-370.  Exec adaptor should commit immediately. (asrabkin)
 
     CHUKWA-367.  Print metadata in DumpChunks. (asrabkin)

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Thu Aug 13 03:05:13 2009
@@ -53,6 +53,12 @@
     synchronized (this) {
       while (chunk.getData().length + dataSize > MAX_MEM_USAGE) {
         try {
+          if(dataSize == 0) { //queue is empty, but data is still too big
+            log.error("JUMBO CHUNK SPOTTED: type= " + chunk.getDataType() + 
+                " and source =" +chunk.getStreamName()); 
+            return; //return without sending; otherwise we'd deadlock.
+            //this error should probably be fatal; there's no way to recover.
+          }
           metrics.fullQueue.set(1);
           this.wait();
           log.info("MemLimitQueue is full [" + dataSize + "]");

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java Thu Aug 13 03:05:13 2009
@@ -54,11 +54,16 @@
 
   static Logger log = Logger.getLogger(HttpConnector.class);
 
-  static Timer statTimer = null;
-  static volatile int chunkCount = 0;
-  static final int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
-  static final int MIN_POST_INTERVAL = 5 * 1000;
-  static ChunkQueue chunkQueue;
+  Timer statTimer = null;
+  volatile int chunkCount = 0;
+  
+  int MAX_SIZE_PER_POST = 2 * 1024 * 1024;
+  int MIN_POST_INTERVAL = 5 * 1000;
+  public static final String MIN_POST_INTERVAL_OPT = "httpConnector.minPostInterval";
+  public static final String MAX_SIZE_PER_POST_OPT = "httpConnector.maxPostSize";
+
+  
+  ChunkQueue chunkQueue;
 
   ChukwaAgent agent;
   String argDestination = null;
@@ -68,9 +73,8 @@
   private Iterator<String> collectors = null;
   protected ChukwaSender connectorClient = null;
 
-  static {
+  { //instance initializer block
     statTimer = new Timer();
-    chunkQueue = DataFactory.getInstance().getEventQueue();
     statTimer.schedule(new TimerTask() {
       public void run() {
         int count = chunkCount;
@@ -93,6 +97,12 @@
   }
 
   public void start() {
+
+    chunkQueue = DataFactory.getInstance().getEventQueue();
+    MAX_SIZE_PER_POST = agent.getConfiguration().getInt(MAX_SIZE_PER_POST_OPT,
+        MAX_SIZE_PER_POST);
+    MIN_POST_INTERVAL = agent.getConfiguration().getInt(MIN_POST_INTERVAL_OPT,
+        MIN_POST_INTERVAL);
     (new Thread(this, "HTTP post thread")).start();
   }
 
@@ -168,8 +178,7 @@
           Thread.sleep(now - lastPost); // wait for stuff to accumulate
         lastPost = now;
       } // end of try forever loop
-      log
-          .info("received stop() command so exiting run() loop to shutdown connector");
+      log.info("received stop() command so exiting run() loop to shutdown connector");
     } catch (OutOfMemoryError e) {
       log.warn("Bailing out", e);
       DaemonWatcher.bailout(-1);

Added: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java?rev=803761&view=auto
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java (added)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/NullWriter.java Thu Aug 13 03:05:13 2009
@@ -0,0 +1,42 @@
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.util.List;
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Minimal writer; does nothing with data.
+ * 
+ * Useful primarily as an end-of-pipeline stage, if stuff in the middle
+ * is accomplishing something useful.
+ *
+ */
+public class NullWriter implements ChukwaWriter {
+  
+  //in kb per sec
+  int maxDataRate = Integer.MAX_VALUE;
+  public static final String RATE_OPT_NAME = "nullWriter.dataRate";
+  @Override
+  public void add(List<Chunk> chunks) throws WriterException {
+    try {
+      int dataBytes =0;
+      for(Chunk c: chunks)
+        dataBytes +=c.getData().length;
+      if(maxDataRate > 0)
+        Thread.sleep(dataBytes / maxDataRate);
+    } catch(Exception e) {}
+    return;
+  }
+
+  @Override
+  public void close() throws WriterException {
+    return;
+  }
+
+  @Override
+  public void init(Configuration c) throws WriterException {
+    maxDataRate = c.getInt(RATE_OPT_NAME, 0);
+    return;
+  }
+
+}

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java Thu Aug 13 03:05:13 2009
@@ -50,7 +50,7 @@
  * TeeWriter ---> Client    (Chunk serialized as Writable)*
  *              An indefinite sequence of serialized chunks
  *              
- *  In english: clients should connect and say either "RAW " or "WRITABLE " 
+ *  In English: clients should connect and say either "RAW " or "WRITABLE " 
  *  followed by a filter.  (Note that the keyword is followed by exactly one space.)
  *  They'll then receive either a sequence of byte arrays or of writable-serialized.
  *  
@@ -204,8 +204,10 @@
     }
 
     public void handle(Chunk c) {
+      
+      //don't ever block; just ignore this chunk if we don't have room for it.
       if(rules.matches(c)) 
-        sendQ.add(c);
+        sendQ.offer(c);
     }
   }
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=803761&r1=803760&r2=803761&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Thu Aug 13 03:05:13 2009
@@ -20,16 +20,17 @@
 
 
 import java.util.Random;
-import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.*;
 import org.apache.hadoop.chukwa.datacollection.*;
 import org.apache.hadoop.chukwa.datacollection.adaptor.*;
 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
 import org.apache.hadoop.chukwa.datacollection.agent.AdaptorManager;
+import org.apache.hadoop.conf.Configuration;
 
 public class ConstRateAdaptor extends Thread implements Adaptor {
 
-  private static final int SLEEP_VARIANCE = 200;
-  private static final int MIN_SLEEP = 300;
+  private int SLEEP_VARIANCE = 200;
+  private int MIN_SLEEP = 300;
 
   private String type;
   private long offset;
@@ -51,6 +52,9 @@
     this.type = type;
     this.dest = dest;
     this.setName("ConstRate Adaptor_" + type);
+    Configuration conf = c.getConfiguration();
+    MIN_SLEEP = conf.getInt("constAdaptor.minSleep", MIN_SLEEP);
+    SLEEP_VARIANCE = conf.getInt("constAdaptor.sleepVariance", SLEEP_VARIANCE);
     super.start(); // this is a Thread.start
   }
 
@@ -65,16 +69,17 @@
   }
 
   public void run() {
-    Random r = new Random();
+    Random timeCoin = new Random();
     try {
       while (!stopping) {
-        int MSToSleep = r.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; // between 1 and
+        int MSToSleep = timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; // between 1 and
                                                                // 3 secs
         // FIXME: I think there's still a risk of integer overflow here
         int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L);
         byte[] data = new byte[arraySize];
-        r.nextBytes(data);
+        Random dataPattern = new Random(offset);
         offset += data.length;
+        dataPattern.nextBytes(data);
         ChunkImpl evt = new ChunkImpl(type, "random data source", offset, data,
             this);
 
@@ -106,16 +111,28 @@
   }
 
 
-    @Override
-    public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
-      
-      switch(shutdownPolicy) {
-        case HARD_STOP :
-        case GRACEFULLY : 
-        case WAIT_TILL_FINISHED :
-          stopping = true;
-        break;
-      }
-      return offset;
+  @Override
+  public long shutdown(AdaptorShutdownPolicy shutdownPolicy) {
+    
+    switch(shutdownPolicy) {
+      case HARD_STOP :
+      case GRACEFULLY : 
+      case WAIT_TILL_FINISHED :
+        stopping = true;
+      break;
     }
+    return offset;
+  }
+  
+  public static boolean checkChunk(Chunk chunk) {
+    byte[] data = chunk.getData();
+    byte[] correctData = new byte[data.length];
+    Random dataPattern = new Random(chunk.getSeqID());
+    dataPattern.nextBytes(correctData);
+    for(int i=0; i < data.length ; ++i) 
+      if(data [i] != correctData[i])
+        return false;
+     
+    return true;
+  }
 }