You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/11 09:03:46 UTC

svn commit: r1181636 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java

Author: esammer
Date: Tue Oct 11 07:03:46 2011
New Revision: 1181636

URL: http://svn.apache.org/viewvc?rev=1181636&view=rev
Log:
FLUME-784: MemoryChannel should poll with timeout on take() rather than block indefinitely

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1181636&r1=1181635&r2=1181636&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Tue Oct 11 07:03:46 2011
@@ -19,6 +19,7 @@ package org.apache.flume.channel;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
@@ -32,17 +33,24 @@ import com.google.common.base.Preconditi
 public class MemoryChannel implements Channel, Configurable {
 
   private static final Integer defaultCapacity = 50;
+  private static final Integer defaultKeepAlive = 3;
 
   private BlockingQueue<Event> queue;
+  private Integer keepAlive;
 
   @Override
   public void configure(Context context) {
     Integer capacity = context.get("capacity", Integer.class);
+    keepAlive = context.get("keep-alive", Integer.class);
 
     if (capacity == null) {
       capacity = defaultCapacity;
     }
 
+    if (keepAlive == null) {
+      keepAlive = defaultKeepAlive;
+    }
+
     queue = new ArrayBlockingQueue<Event>(capacity);
   }
 
@@ -64,7 +72,7 @@ public class MemoryChannel implements Ch
         "No queue defined (Did you forget to configure me?");
 
     try {
-      return queue.take();
+      return queue.poll(keepAlive, TimeUnit.SECONDS);
     } catch (InterruptedException ex) {
       throw new ChannelException("Failed to take()", ex);
     }