You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/02/23 17:18:02 UTC
flume git commit: FLUME-2886: Optional Channels can cause OOMs
Repository: flume
Updated Branches:
refs/heads/trunk de6ecf485 -> 109ec3072
FLUME-2886: Optional Channels can cause OOMs
(Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/109ec307
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/109ec307
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/109ec307
Branch: refs/heads/trunk
Commit: 109ec30725a4c665a2ccf5f40af8a0e455cf4166
Parents: de6ecf4
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Tue Feb 23 08:17:34 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Tue Feb 23 08:17:34 2016 -0800
----------------------------------------------------------------------
.../apache/flume/channel/ChannelProcessor.java | 13 ++++---
.../flume/channel/TestChannelProcessor.java | 38 ++++++++++++++++++++
2 files changed, 47 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/109ec307/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
index f2612a6..7b2de7c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
@@ -26,8 +26,7 @@ import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.*;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
@@ -62,6 +61,7 @@ public class ChannelProcessor implements Configurable {
private final ChannelSelector selector;
private final InterceptorChain interceptorChain;
private ExecutorService execService;
+ BlockingQueue<Runnable> taskQueue;
public ChannelProcessor(ChannelSelector selector) {
this.selector = selector;
@@ -82,8 +82,13 @@ public class ChannelProcessor implements Configurable {
*/
@Override
public void configure(Context context) {
- this.execService = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("OptionalChannelProcessorThread").build());
+ int queueSize = context.getInteger("pendingTransactions", 20);
+ taskQueue = new ArrayBlockingQueue<Runnable>(queueSize, true);
+ ThreadFactory factory = new ThreadFactoryBuilder()
+ .setNameFormat("OptionalChannelProcessorThread").build();
+ this.execService =
+ new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, taskQueue,
+ factory, new ThreadPoolExecutor.DiscardPolicy());
configureInterceptors(context);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/109ec307/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
index 924c998..c2a5748 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
@@ -148,4 +148,42 @@ public class TestChannelProcessor {
}
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testOptionalChannelQueueSize() throws InterruptedException {
+ Context context = new Context();
+ context.put("capacity", "100");
+ context.put("transactionCapacity", "3");
+ context.put("pendingTransactions", "2");
+
+ ArrayList<MemoryChannel> channels = new ArrayList<MemoryChannel>();
+ for (int i = 0; i < 2; i++) {
+ MemoryChannel ch = new MemoryChannel();
+ ch.setName("ch" + i);
+ channels.add(ch);
+ }
+ Configurables.configure(channels.get(0), context);
+ context.put("capacity", "3");
+ Configurables.configure(channels.get(1), context);
+ ChannelSelector selector = new ReplicatingChannelSelector();
+ selector.setChannels((List) channels);
+
+ context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1");
+ Configurables.configure(selector, context);
+
+ ChannelProcessor processor = new ChannelProcessor(selector);
+ Configurables.configure(processor, context);
+
+ // The idea is to put more events into the optional channel than its capacity + the size of
+ // the task queue. So the remaining events get added to the task queue, but since it is
+ // bounded, its size should not grow indefinitely either.
+ for (int i = 0; i <= 6; i++) {
+ processor.processEvent(EventBuilder.withBody("e".getBytes()));
+ // To avoid tasks from being rejected so if previous events are still not committed, wait
+ // between transactions.
+ Thread.sleep(500);
+ }
+ // 3 in channel, 1 executing, 2 in queue, 1 rejected
+ Assert.assertEquals(2, processor.taskQueue.size());
+ }
}