You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/02/23 17:18:02 UTC

flume git commit: FLUME-2886: Optional Channels can cause OOMs

Repository: flume
Updated Branches:
  refs/heads/trunk de6ecf485 -> 109ec3072


FLUME-2886: Optional Channels can cause OOMs

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/109ec307
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/109ec307
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/109ec307

Branch: refs/heads/trunk
Commit: 109ec30725a4c665a2ccf5f40af8a0e455cf4166
Parents: de6ecf4
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Feb 23 08:17:34 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Feb 23 08:17:34 2016 -0800

----------------------------------------------------------------------
 .../apache/flume/channel/ChannelProcessor.java  | 13 ++++---
 .../flume/channel/TestChannelProcessor.java     | 38 ++++++++++++++++++++
 2 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/109ec307/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
index f2612a6..7b2de7c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
@@ -26,8 +26,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.*;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
@@ -62,6 +61,7 @@ public class ChannelProcessor implements Configurable {
   private final ChannelSelector selector;
   private final InterceptorChain interceptorChain;
   private ExecutorService execService;
+  BlockingQueue<Runnable> taskQueue;
 
   public ChannelProcessor(ChannelSelector selector) {
     this.selector = selector;
@@ -82,8 +82,13 @@ public class ChannelProcessor implements Configurable {
    */
   @Override
   public void configure(Context context) {
-    this.execService = Executors.newSingleThreadExecutor(
-      new ThreadFactoryBuilder().setNameFormat("OptionalChannelProcessorThread").build());
+    int queueSize = context.getInteger("pendingTransactions", 20);
+    taskQueue = new ArrayBlockingQueue<Runnable>(queueSize, true);
+    ThreadFactory factory = new ThreadFactoryBuilder()
+      .setNameFormat("OptionalChannelProcessorThread").build();
+    this.execService =
+      new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, taskQueue,
+        factory, new ThreadPoolExecutor.DiscardPolicy());
     configureInterceptors(context);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/109ec307/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
index 924c998..c2a5748 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
@@ -148,4 +148,42 @@ public class TestChannelProcessor {
     }
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testOptionalChannelQueueSize() throws InterruptedException {
+    Context context = new Context();
+    context.put("capacity", "100");
+    context.put("transactionCapacity", "3");
+    context.put("pendingTransactions", "2");
+
+    ArrayList<MemoryChannel> channels = new ArrayList<MemoryChannel>();
+    for (int i = 0; i < 2; i++) {
+      MemoryChannel ch = new MemoryChannel();
+      ch.setName("ch" + i);
+      channels.add(ch);
+    }
+    Configurables.configure(channels.get(0), context);
+    context.put("capacity", "3");
+    Configurables.configure(channels.get(1), context);
+    ChannelSelector selector = new ReplicatingChannelSelector();
+    selector.setChannels((List) channels);
+
+    context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1");
+    Configurables.configure(selector, context);
+
+    ChannelProcessor processor = new ChannelProcessor(selector);
+    Configurables.configure(processor, context);
+
+    // The idea is to put more events into the optional channel than its capacity + the size of
+    // the task queue. So the remaining events get added to the task queue, but since it is
+    // bounded, its size should not grow indefinitely either.
+    for (int i = 0; i <= 6; i++) {
+      processor.processEvent(EventBuilder.withBody("e".getBytes()));
+      // To avoid tasks from being rejected so if previous events are still not committed, wait
+      // between transactions.
+      Thread.sleep(500);
+    }
+    // 3 in channel, 1 executing, 2 in queue, 1 rejected
+    Assert.assertEquals(2, processor.taskQueue.size());
+  }
 }