You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@chukwa.apache.org by sh...@apache.org on 2014/07/01 23:39:38 UTC
svn commit: r1607199 - in /chukwa/trunk: conf/
src/main/java/org/apache/hadoop/chukwa/datacollection/
src/main/java/org/apache/hadoop/chukwa/datacollection/agent/
src/test/java/org/apache/hadoop/chukwa/datacollection/agent/
Author: shreyas
Date: Tue Jul 1 21:39:37 2014
New Revision: 1607199
URL: http://svn.apache.org/r1607199
Log:
CHUKWA-714 Make ChunkQueue configurable
Added:
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java
chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
Modified:
chukwa/trunk/conf/chukwa-agent-conf.xml
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
Modified: chukwa/trunk/conf/chukwa-agent-conf.xml
URL: http://svn.apache.org/viewvc/chukwa/trunk/conf/chukwa-agent-conf.xml?rev=1607199&r1=1607198&r2=1607199&view=diff
==============================================================================
--- chukwa/trunk/conf/chukwa-agent-conf.xml (original)
+++ chukwa/trunk/conf/chukwa-agent-conf.xml Tue Jul 1 21:39:37 2014
@@ -82,6 +82,17 @@
</property>
<property>
+ <name>chukwaAgent.chunk.queue</name>
+ <value>org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue</value>
+ </property>
+
+ <property>
+ <name>chukwaAgent.chunk.queue.limit</name>
+ <value>10485760</value>
+ </property>
+
+
+ <property>
<name>syslog.adaptor.port.9095.facility.LOCAL1</name>
<value>HADOOP</value>
</property>
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java?rev=1607199&r1=1607198&r2=1607199&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/DataFactory.java Tue Jul 1 21:39:37 2014
@@ -21,17 +21,20 @@ package org.apache.hadoop.chukwa.datacol
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.util.Iterator;
import org.apache.hadoop.conf.*;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue;
import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
import org.apache.log4j.Logger;
public class DataFactory {
static Logger log = Logger.getLogger(DataFactory.class);
- static final int QUEUE_SIZE_KB = 10 * 1024;
static final String COLLECTORS_FILENAME = "collectors";
+ static final String CHUNK_QUEUE = "chukwaAgent.chunk.queue";
+
private static DataFactory dataFactory = null;
private ChunkQueue chunkQueue = null;
@@ -50,10 +53,45 @@ public class DataFactory {
public synchronized ChunkQueue getEventQueue() {
if (chunkQueue == null) {
- chunkQueue = new MemLimitQueue(QUEUE_SIZE_KB * 1024);
+ chunkQueue = createEventQueue();
}
return chunkQueue;
}
+
+ public synchronized ChunkQueue createEventQueue() {
+ Configuration conf = ChukwaAgent.getStaticConfiguration();
+ if(conf == null){
+ //Must be a unit test, use default queue with default configuration
+ return new MemLimitQueue(null);
+ }
+ String receiver = conf.get(CHUNK_QUEUE);
+ ChunkQueue queue = null;
+ if(receiver == null){
+ log.warn("Empty configuration for " + CHUNK_QUEUE + ". Defaulting to MemLimitQueue");
+ queue = new MemLimitQueue(conf);
+ return queue;
+ }
+
+ try {
+ Class<?> clazz = Class.forName(receiver);
+ log.info(clazz);
+ if(!ChunkQueue.class.isAssignableFrom(clazz)){
+ throw new Exception(receiver + " is not an instance of ChunkQueue");
+ }
+ try {
+ Constructor<?> ctor = clazz.getConstructor(new Class[]{Configuration.class});
+ queue = (ChunkQueue) ctor.newInstance(conf);
+ } catch(NoSuchMethodException nsme){
+ //Queue implementations which take no configuration parameter
+ queue = (ChunkQueue) clazz.newInstance();
+ }
+ } catch(Exception e) {
+ log.error("Could not instantiate configured ChunkQueue due to: " + e);
+ log.error("Defaulting to MemLimitQueue");
+ queue = new MemLimitQueue(conf);
+ }
+ return queue;
+ }
public String getDefaultTags() {
return defaultTags;
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java?rev=1607199&r1=1607198&r2=1607199&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/ChukwaAgent.java Tue Jul 1 21:39:37 2014
@@ -716,6 +716,10 @@ public class ChukwaAgent implements Adap
return conf;
}
+ public static Configuration getStaticConfiguration() {
+ return conf;
+ }
+
@Override
public Adaptor getAdaptor(String name) {
synchronized(adaptorsByName) {
Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java?rev=1607199&r1=1607198&r2=1607199&view=diff
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java (original)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/MemLimitQueue.java Tue Jul 1 21:39:37 2014
@@ -25,6 +25,7 @@ import java.util.Queue;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
/**
@@ -39,10 +40,12 @@ public class MemLimitQueue implements Ch
static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent", "chunkQueue");;
private Queue<Chunk> queue = new LinkedList<Chunk>();
private long dataSize = 0;
- private final long MAX_MEM_USAGE;
+ private long MAX_MEM_USAGE;
+ static final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
+ static final int QUEUE_SIZE = 10 * 1024 * 1024;
- public MemLimitQueue(int limit) {
- MAX_MEM_USAGE = limit;
+ public MemLimitQueue(Configuration conf) {
+ configure(conf);
}
/**
@@ -111,4 +114,21 @@ public class MemLimitQueue implements Ch
public int size() {
return queue.size();
}
+
+ private void configure(Configuration conf) {
+ MAX_MEM_USAGE = QUEUE_SIZE;
+ if(conf == null){
+ return;
+ }
+ String limit = conf.get(CHUNK_QUEUE_LIMIT);
+ if(limit != null){
+ try{
+ MAX_MEM_USAGE = Integer.parseInt(limit);
+ } catch(NumberFormatException nfe) {
+ log.error("Exception reading property " + CHUNK_QUEUE_LIMIT
+ + ". Defaulting internal queue size to " + QUEUE_SIZE);
+ }
+ }
+ log.info("Using MemLimitQueue limit of " + MAX_MEM_USAGE);
+ }
}
Added: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java?rev=1607199&view=auto
==============================================================================
--- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java (added)
+++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/NonBlockingMemLimitQueue.java Tue Jul 1 21:39:37 2014
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.agent.metrics.ChunkQueueMetrics;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * An event queue that discards incoming chunks once a fixed upper limit of data
+ * is enqueued. The method calling add will not block.
+ *
+ * For now, uses the size of the data field. Should really use
+ * estimatedSerializedSize()?
+ *
+ */
+public class NonBlockingMemLimitQueue implements ChunkQueue {
+ static Logger log = Logger.getLogger(NonBlockingMemLimitQueue.class);
+ static final ChunkQueueMetrics metrics = new ChunkQueueMetrics("chukwaAgent",
+ "chunkQueue");
+ static final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
+ static final int QUEUE_SIZE = 10 * 1024 * 1024;
+ private Queue<Chunk> queue = new LinkedList<Chunk>();
+ private long dataSize = 0;
+ private long MAX_MEM_USAGE;
+
+ public NonBlockingMemLimitQueue(Configuration conf) {
+ configure(conf);
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#add(org.apache.hadoop.chukwa.Chunk)
+ */
+ public void add(Chunk chunk) throws InterruptedException {
+ assert chunk != null : "can't enqueue null chunks";
+ int chunkSize = chunk.getData().length;
+ synchronized (this) {
+ if (chunkSize + dataSize > MAX_MEM_USAGE) {
+ if (dataSize == 0) { // queue is empty, but data is still too big
+ log.error("JUMBO CHUNK SPOTTED: type= " + chunk.getDataType()
+ + " and source =" + chunk.getStreamName());
+ return; // return without sending; otherwise we'd deadlock.
+ // this error should probably be fatal; there's no way to
+ // recover.
+ } else {
+ metrics.fullQueue.set(1);
+ log.warn("Discarding chunk due to NonBlockingMemLimitQueue full [" + dataSize
+ + "]");
+ return;
+ }
+ }
+ metrics.fullQueue.set(0);
+ dataSize += chunk.getData().length;
+ queue.add(chunk);
+ metrics.addedChunk.inc();
+ metrics.queueSize.set(queue.size());
+ metrics.dataSize.set(dataSize);
+ this.notifyAll();
+ }
+ }
+
+ /**
+ * @see org.apache.hadoop.chukwa.datacollection.ChunkQueue#collect(java.util.List,
+ * int)
+ */
+ public void collect(List<Chunk> events, int maxSize)
+ throws InterruptedException {
+ synchronized (this) {
+ // we can't just say queue.take() here, since we're holding a lock.
+ while (queue.isEmpty()) {
+ this.wait();
+ }
+
+ int size = 0;
+ while (!queue.isEmpty() && (size < maxSize)) {
+ Chunk e = this.queue.remove();
+ metrics.removedChunk.inc();
+ int chunkSize = e.getData().length;
+ size += chunkSize;
+ dataSize -= chunkSize;
+ metrics.dataSize.set(dataSize);
+ events.add(e);
+ }
+ metrics.queueSize.set(queue.size());
+ this.notifyAll();
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("WaitingQueue.inQueueCount:" + queue.size()
+ + "\tWaitingQueue.collectCount:" + events.size());
+ }
+ }
+
+ public int size() {
+ return queue.size();
+ }
+
+ private void configure(Configuration conf) {
+ MAX_MEM_USAGE = QUEUE_SIZE;
+ if(conf == null){
+ return;
+ }
+ String limit = conf.get(CHUNK_QUEUE_LIMIT);
+ if(limit != null){
+ try{
+ MAX_MEM_USAGE = Integer.parseInt(limit);
+ } catch(NumberFormatException nfe) {
+ log.error("Exception reading property " + CHUNK_QUEUE_LIMIT
+ + ". Defaulting internal queue size to " + QUEUE_SIZE);
+ }
+ }
+ log.info("Using NonBlockingMemLimitQueue limit of " + MAX_MEM_USAGE);
+ }
+}
Added: chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java
URL: http://svn.apache.org/viewvc/chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java?rev=1607199&view=auto
==============================================================================
--- chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java (added)
+++ chukwa/trunk/src/test/java/org/apache/hadoop/chukwa/datacollection/agent/TestChunkQueue.java Tue Jul 1 21:39:37 2014
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.agent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.chukwa.datacollection.ChunkQueue;
+import org.apache.hadoop.chukwa.datacollection.DataFactory;
+import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
+import org.apache.hadoop.conf.Configuration;
+
+import junit.framework.TestCase;
+
+public class TestChunkQueue extends TestCase {
+
+ private final byte[] data = "this is a chunk".getBytes();
+ List<Chunk> putList, getList;
+ int NUM_CHUNKS = 10;
+ int QUEUE_SIZE = 6;
+ int QUEUE_LIMIT = data.length * QUEUE_SIZE;
+ ChukwaAgent agent = null;
+ Configuration conf = null;
+ final String CHUNK_QUEUE_LIMIT = "chukwaAgent.chunk.queue.limit";
+ final String CHUNK_QUEUE = "chukwaAgent.chunk.queue";
+ DataFactory df = DataFactory.getInstance();
+
+ @Override
+ protected void setUp() throws AlreadyRunningException {
+ agent = ChukwaAgent.getAgent();
+ if(agent == null){
+ agent = new ChukwaAgent();
+ }
+ conf = agent.getConfiguration();
+ conf.set(CHUNK_QUEUE_LIMIT, Integer.toString(QUEUE_LIMIT));
+ putList = new ArrayList<Chunk>(10);
+ for (int i = 1; i <= NUM_CHUNKS; i++) {
+ Chunk c = new ChunkImpl("DataType", "StreamName", (long) i, data, null);
+ putList.add(c);
+ }
+ }
+
+ @Override
+ protected void tearDown() {
+ if(agent != null){
+ agent.shutdown();
+ }
+ }
+
+ public void testMemLimitQueue() {
+ conf.set(CHUNK_QUEUE, "org.apache.hadoop.chukwa.datacollection.agent.MemLimitQueue");
+ ChunkQueue mlq = df.createEventQueue();
+ testBlockingNature(mlq);
+ }
+
+ public void testNonBlockingMemLimitQueue() {
+ conf.set(CHUNK_QUEUE, "org.apache.hadoop.chukwa.datacollection.agent.NonBlockingMemLimitQueue");
+ ChunkQueue nbmlq = df.createEventQueue();
+ testNonBlockingNature(nbmlq);
+ }
+
+ /**
+ * Putter thread gets a list of chunks and adds all of them
+ * to the ChunkQueue
+ */
+ private class Putter extends Thread {
+ List<Chunk> chunks;
+ ChunkQueue q;
+
+ Putter(List<Chunk> chunks, ChunkQueue q) {
+ this.chunks = chunks;
+ this.q = q;
+ }
+
+ public void run() {
+ try {
+ for (Chunk c : chunks) {
+ q.add(c);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ /**
+ * Getter thread collects all the chunks from the
+ * ChunkQueue indefinitely
+ */
+ private class Getter extends Thread {
+ List<Chunk> chunks;
+ ChunkQueue q;
+
+ Getter(List<Chunk> chunks, ChunkQueue q) {
+ this.chunks = chunks;
+ this.q = q;
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ q.collect(chunks, Integer.MAX_VALUE);
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private void joinThread(Thread t, int timeout) {
+ try {
+ t.join(timeout);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ /**
+ * This test makes sure that the putter thread blocks when queue is full
+ *
+ * @param ChunkQueue
+ * q
+ */
+ private void testBlockingNature(ChunkQueue q) {
+ Putter putter = new Putter(putList, q);
+ putter.start();
+ joinThread(putter, 3000);
+ if (!putter.isAlive()) {
+ fail("Blocking queue semantics not implemented");
+ }
+ assertTrue("Could not verify queue size after put", q.size() == QUEUE_SIZE);
+ getList = new ArrayList<Chunk>();
+ Getter getter = new Getter(getList, q);
+ getter.start();
+ joinThread(getter, 3000);
+ assertTrue("Could not verify queue size after get", q.size() == 0);
+ // make sure we got all chunks
+ assertTrue("Could not verify all chunks got drained after get",
+ getList.size() == NUM_CHUNKS);
+ putter.interrupt();
+ getter.interrupt();
+ }
+
+ /**
+ * This test makes sure that the putter thread does not blocks when queue is
+ * full. This test does not check if the queue implementation uses a circular
+ * buffer to retain the most recent chunks or discards new incoming chunks
+ *
+ * @param ChunkQueue
+ * q
+ */
+ private void testNonBlockingNature(ChunkQueue q) {
+ Putter putter = new Putter(putList, q);
+ putter.start();
+ joinThread(putter, 3000);
+ if (putter.isAlive()) {
+ fail("Non Blocking queue semantics not implemented");
+ }
+ assertTrue("Could not verify queue size after put", q.size() == QUEUE_SIZE);
+ getList = new ArrayList<Chunk>();
+ Getter getter = new Getter(getList, q);
+ getter.start();
+ joinThread(getter, 3000);
+ assertTrue("Could not verify all chunks got drained after get",
+ q.size() == 0);
+ // make sure we got only the chunks
+ assertTrue("Could not verify chunks after get",
+ getList.size() == QUEUE_SIZE);
+ putter.interrupt();
+ getter.interrupt();
+ }
+}