You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by sh...@apache.org on 2014/07/01 23:39:38 UTC

svn commit: r1607199 - in /chukwa/trunk: conf/ src/main/java/org/apache/hadoop/chukwa/datacollection/ src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ src/test/java/org/apache/hadoop/chukwa/datacollection/agent/

Author: shreyas
Date: Tue Jul  1 21:39:37 2014
New Revision: 1607199

URL: http://svn.apache.org/r1607199
Log:
CHUKWA-714 Make ChunkQueue configurable

Added:
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java
    chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
Modified:
    chukwa/trunk/conf/chukwa-agent-conf.xml
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
    chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java

Modified: chukwa/trunk/conf/chukwa-agent-conf.xml
URL: http://svn.apache.org/viewvc/chukwa/trunk/conf/chukwa-agent-conf.xml?rev=1607199&r1=1607198&r2=1607199&view=diff
==============================================================================
--- chukwa/trunk/conf/chukwa-agent-conf.xml (original)
+++ chukwa/trunk/conf/chukwa-agent-conf.xml Tue Jul  1 21:39:37 2014
@@ -82,6 +82,17 @@
   </property>
 
   <property>
+    <name>chukwaAgent.chunk.queue</name>
+    <value>org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue</value>
+  </property>
+
+  <property>
+    <name>chukwaAgent.chunk.queue.limit</name>
+    <value>10485760</value>
+  </property>
+
+
+  <property>
     <name>syslog.adaptor.port.9095.facility.LOCAL1</name>
     <value>HADOOP</value>
   </property>

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java?rev=1607199&r1=1607198&r2=1607199&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java Tue Jul  1 21:39:37 2014
@@ -21,17 +21,20 @@ package org.apache.hadoop.chukwa.datacol
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Constructor;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
 import org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue;
 import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
 import org.apache.log4j.Logger;
 
 public class DataFactory {
   static Logger log = Logger.getLogger(DataFactory.class);
-  static final int QUEUE_SIZE_KB = 10 * 1024;
   static final String COLLECTORS_FILENAME = "collectors";
+  static final String CHUNK_QUEUE = "chukwaAgent.chunk.queue";
+  
   private static DataFactory dataFactory = null;
   private ChunkQueue chunkQueue = null;
 
@@ -50,10 +53,45 @@ public class DataFactory {
 
   public synchronized ChunkQueue getEventQueue() {
     if (chunkQueue == null) {
-      chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
+      chunkQueue = createEventQueue();
     }
     return chunkQueue;
   }
+  
+  public synchronized ChunkQueue createEventQueue() {
+    Configuration conf = ChukwaAgent.getStaticConfiguration();
+    if(conf == null){
+    //Must be a unit test, use default queue with default configuration
+      return new MemLimitQueue(null);
+    }
+    String receiver = conf.get(CHUNK_QUEUE);
+    ChunkQueue queue = null;
+    if(receiver == null){
+      log.warn("Empty configuration for " + CHUNK_QUEUE + ". Defaulting to MemLimitQueue");
+      queue = new MemLimitQueue(conf);
+      return queue;
+    }
+    
+    try {
+      Class<?> clazz = Class.forName(receiver);
+      log.info(clazz);
+      if(!ChunkQueue.class.isAssignableFrom(clazz)){
+        throw new Exception(receiver + " is not an instance of ChunkQueue");
+      }
+      try {
+        Constructor<?> ctor = clazz.getConstructor(new Class[]{Configuration.class});
+        queue = (ChunkQueue) ctor.newInstance(conf);
+      } catch(NoSuchMethodException nsme){
+        //Queue implementations which take no configuration parameter
+        queue = (ChunkQueue) clazz.newInstance();
+      }
+    } catch(Exception e) {
+      log.error("Could not instantiate configured ChunkQueue due to: " + e);
+      log.error("Defaulting to MemLimitQueue");
+      queue = new MemLimitQueue(conf);
+    }
+    return queue;
+  }
 
   public String getDefaultTags() {
     return defaultTags;

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=1607199&r1=1607198&r2=1607199&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Tue Jul  1 21:39:37 2014
@@ -716,6 +716,10 @@ public class ChukwaAgent implements Adap
     return conf;
   }
   
+  public static Configuration getStaticConfiguration() {
+    return conf;
+  }
+  
   @Override
   public Adaptor getAdaptor(String name) {
     synchronized(adaptorsByName) {

Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=1607199&r1=1607198&r2=1607199&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Tue Jul  1 21:39:37 2014
@@ -25,6 +25,7 @@ import java.util.Queue;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
 import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
 /**
@@ -39,10 +40,12 @@ public class MemLimitQueue implements Ch
   static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent", "chunkQueue");;
   private Queue<Chunk> queue = new LinkedList<Chunk>();
   private long dataSize = 0;
-  private final long MAX_MEM_USAGE;
+  private long MAX_MEM_USAGE;
+  static final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
+  static final int QUEUE_SIZE = 10 * 1024 * 1024;
 
-  public MemLimitQueue(int limit) {
-    MAX_MEM_USAGE = limit;
+  public MemLimitQueue(Configuration conf) {
+    configure(conf);
   }
 
   /**
@@ -111,4 +114,21 @@ public class MemLimitQueue implements Ch
   public int size() {
     return queue.size();
   }
+  
+  private void configure(Configuration conf) {
+    MAX_MEM_USAGE = QUEUE_SIZE;
+    if(conf == null){
+      return;
+    }
+    String limit = conf.get(CHUNK_QUEUE_LIMIT);
+    if(limit != null){
+      try{
+        MAX_MEM_USAGE = Integer.parseInt(limit);
+      } catch(NumberFormatException nfe) {
+        log.error("Exception reading property " + CHUNK_QUEUE_LIMIT
+            + ". Defaulting internal queue size to " + QUEUE_SIZE);
+      }
+    }
+    log.info("Using MemLimitQueue limit of " + MAX_MEM_USAGE);
+  }
 }

Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java?rev=1607199&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java Tue Jul  1 21:39:37 2014
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * An event queue that discards incoming chunks once a fixed upper limit of data
+ * is enqueued. The method calling add will not block.
+ * 
+ * For now, uses the size of the data field. Should really use
+ * estimatedSerializedSize()?
+ * 
+ */
+public class NonBlockingMemLimitQueue implements ChunkQueue {
+  static Logger log = Logger.getLogger(NonBlockingMemLimitQueue.class);
+  static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent",
+      "chunkQueue");
+  static final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
+  static final int QUEUE_SIZE = 10 * 1024 * 1024;
+  private Queue<Chunk> queue = new LinkedList<Chunk>();
+  private long dataSize = 0;
+  private long MAX_MEM_USAGE;
+
+  public NonBlockingMemLimitQueue(Configuration conf) {
+    configure(conf);
+  }
+  
+  /**
+   * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
+   */
+  public void add(Chunk chunk) throws InterruptedException {
+    assert chunk != null : "can't enqueue null chunks";
+    int chunkSize = chunk.getData().length;
+    synchronized (this) {
+      if (chunkSize + dataSize > MAX_MEM_USAGE) {
+        if (dataSize == 0) { // queue is empty, but data is still too big
+          log.error("JUMBO CHUNK SPOTTED: type= " + chunk.getDataType()
+              + " and source =" + chunk.getStreamName());
+          return; // return without sending; otherwise we'd deadlock.
+          // this error should probably be fatal; there's no way to
+          // recover.
+        } else {
+          metrics.fullQueue.set(1);
+          log.warn("Discarding chunk due to NonBlockingMemLimitQueue full [" + dataSize
+              + "]");
+          return;
+        }
+      }
+      metrics.fullQueue.set(0);
+      dataSize += chunk.getData().length;
+      queue.add(chunk);
+      metrics.addedChunk.inc();
+      metrics.queueSize.set(queue.size());
+      metrics.dataSize.set(dataSize);
+      this.notifyAll();
+    }
+  }
+
+  /**
+   * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List,
+   *      int)
+   */
+  public void collect(List<Chunk> events, int maxSize)
+      throws InterruptedException {
+    synchronized (this) {
+      // we can't just say queue.take() here, since we're holding a lock.
+      while (queue.isEmpty()) {
+        this.wait();
+      }
+
+      int size = 0;
+      while (!queue.isEmpty() && (size < maxSize)) {
+        Chunk e = this.queue.remove();
+        metrics.removedChunk.inc();
+        int chunkSize = e.getData().length;
+        size += chunkSize;
+        dataSize -= chunkSize;
+        metrics.dataSize.set(dataSize);
+        events.add(e);
+      }
+      metrics.queueSize.set(queue.size());
+      this.notifyAll();
+    }
+
+    if (log.isDebugEnabled()) {
+      log.debug("WaitingQueue.inQueueCount:" + queue.size()
+          + "\tWaitingQueue.collectCount:" + events.size());
+    }
+  }
+
+  public int size() {
+    return queue.size();
+  }
+
+  private void configure(Configuration conf) {
+    MAX_MEM_USAGE = QUEUE_SIZE;
+    if(conf == null){
+      return;
+    }
+    String limit = conf.get(CHUNK_QUEUE_LIMIT);
+    if(limit != null){
+      try{
+        MAX_MEM_USAGE = Integer.parseInt(limit);
+      } catch(NumberFormatException nfe) {
+        log.error("Exception reading property " + CHUNK_QUEUE_LIMIT
+            + ". Defaulting internal queue size to " + QUEUE_SIZE);
+      }
+    }
+    log.info("Using NonBlockingMemLimitQueue limit of " + MAX_MEM_USAGE);
+  }
+}

Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java?rev=1607199&view=auto
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java (added)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java Tue Jul  1 21:39:37 2014
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
+import org.apache.hadoop.conf.Configuration;
+
+import junit.framework.TestCase;
+
+public class TestChunkQueue extends TestCase {
+
+  private final byte[] data = "this is a chunk".getBytes();
+  List<Chunk> putList, getList;
+  int NUM_CHUNKS = 10;
+  int QUEUE_SIZE = 6;
+  int QUEUE_LIMIT = data.length * QUEUE_SIZE;
+  ChukwaAgent agent = null;
+  Configuration conf = null;
+  final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
+  final String CHUNK_QUEUE = "chukwaAgent.chunk.queue";
+  DataFactory df = DataFactory.getInstance();
+  
+  @Override
+  protected void setUp() throws AlreadyRunningException {
+    agent = ChukwaAgent.getAgent();
+    if(agent == null){
+      agent = new ChukwaAgent();
+    }
+    conf = agent.getConfiguration();
+    conf.set(CHUNK_QUEUE_LIMIT, Integer.toString(QUEUE_LIMIT));
+    putList = new ArrayList<Chunk>(10);
+    for (int i = 1; i <= NUM_CHUNKS; i++) {
+      Chunk c = new ChunkImpl("DataType", "StreamName", (long) i, data, null);
+      putList.add(c);
+    }
+  }
+  
+  @Override 
+  protected void tearDown() {
+    if(agent != null){
+      agent.shutdown();
+    }
+  }
+  
+  public void testMemLimitQueue() {
+    conf.set(CHUNK_QUEUE, "org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue");
+    ChunkQueue mlq = df.createEventQueue();
+    testBlockingNature(mlq);
+  }
+
+  public void testNonBlockingMemLimitQueue() {
+    conf.set(CHUNK_QUEUE, "org.apache.hadoop.chukwa.datacollection.agent.NonBlockingMemLimitQueue");
+    ChunkQueue nbmlq = df.createEventQueue();
+    testNonBlockingNature(nbmlq);
+  }
+
+  /**
+   * Putter thread gets a list of chunks and adds all of them
+   * to the ChunkQueue
+   */
+  private class Putter extends Thread {
+    List<Chunk> chunks;
+    ChunkQueue q;
+
+    Putter(List<Chunk> chunks, ChunkQueue q) {
+      this.chunks = chunks;
+      this.q = q;
+    }
+
+    public void run() {
+      try {
+        for (Chunk c : chunks) {
+          q.add(c);
+        }
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  /**
+   * Getter thread collects all the chunks from the 
+   * ChunkQueue indefinitely
+   */
+  private class Getter extends Thread {
+    List<Chunk> chunks;
+    ChunkQueue q;
+
+    Getter(List<Chunk> chunks, ChunkQueue q) {
+      this.chunks = chunks;
+      this.q = q;
+    }
+
+    public void run() {
+      try {
+        while (true) {
+          q.collect(chunks, Integer.MAX_VALUE);
+        }
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  private void joinThread(Thread t, int timeout) {
+    try {
+      t.join(timeout);
+    } catch (InterruptedException e) {
+    }
+  }
+
+  /**
+   * This test makes sure that the putter thread blocks when queue is full
+   * 
+   * @param ChunkQueue
+   *          q
+   */
+  private void testBlockingNature(ChunkQueue q) {
+    Putter putter = new Putter(putList, q);
+    putter.start();
+    joinThread(putter, 3000);
+    if (!putter.isAlive()) {
+      fail("Blocking queue semantics not implemented");
+    }
+    assertTrue("Could not verify queue size after put", q.size() == QUEUE_SIZE);
+    getList = new ArrayList<Chunk>();
+    Getter getter = new Getter(getList, q);
+    getter.start();
+    joinThread(getter, 3000);
+    assertTrue("Could not verify queue size after get", q.size() == 0);
+    // make sure we got all chunks
+    assertTrue("Could not verify all chunks got drained after get",
+        getList.size() == NUM_CHUNKS);
+    putter.interrupt();
+    getter.interrupt();
+  }
+
+  /**
+   * This test makes sure that the putter thread does not blocks when queue is
+   * full. This test does not check if the queue implementation uses a circular
+   * buffer to retain the most recent chunks or discards new incoming chunks
+   * 
+   * @param ChunkQueue
+   *          q
+   */
+  private void testNonBlockingNature(ChunkQueue q) {
+    Putter putter = new Putter(putList, q);
+    putter.start();
+    joinThread(putter, 3000);
+    if (putter.isAlive()) {
+      fail("Non Blocking queue semantics not implemented");
+    }
+    assertTrue("Could not verify queue size after put", q.size() == QUEUE_SIZE);
+    getList = new ArrayList<Chunk>();
+    Getter getter = new Getter(getList, q);
+    getter.start();
+    joinThread(getter, 3000);
+    assertTrue("Could not verify all chunks got drained after get",
+        q.size() == 0);
+    // make sure we got only the chunks
+    assertTrue("Could not verify chunks after get",
+        getList.size() == QUEUE_SIZE);
+    putter.interrupt();
+    getter.interrupt();
+  }
+}