You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/11 09:03:46 UTC
svn commit: r1181636 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
Author: esammer
Date: Tue Oct 11 07:03:46 2011
New Revision: 1181636
URL: http://svn.apache.org/viewvc?rev=1181636&view=rev
Log:
FLUME-784: MemoryChannel should poll with timeout on take() rather than block indefinitely
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1181636&r1=1181635&r2=1181636&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Tue Oct 11 07:03:46 2011
@@ -19,6 +19,7 @@ package org.apache.flume.channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
@@ -32,17 +33,24 @@ import com.google.common.base.Preconditi
public class MemoryChannel implements Channel, Configurable {
private static final Integer defaultCapacity = 50;
+ private static final Integer defaultKeepAlive = 3;
private BlockingQueue<Event> queue;
+ private Integer keepAlive;
@Override
public void configure(Context context) {
Integer capacity = context.get("capacity", Integer.class);
+ keepAlive = context.get("keep-alive", Integer.class);
if (capacity == null) {
capacity = defaultCapacity;
}
+ if (keepAlive == null) {
+ keepAlive = defaultKeepAlive;
+ }
+
queue = new ArrayBlockingQueue<Event>(capacity);
}
@@ -64,7 +72,7 @@ public class MemoryChannel implements Ch
"No queue defined (Did you forget to configure me?");
try {
- return queue.take();
+ return queue.poll(keepAlive, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
throw new ChannelException("Failed to take()", ex);
}