You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/10/31 00:37:30 UTC
flume git commit: FLUME-2712. Optional channel errors slows down the
Source to Main channel event rate
Repository: flume
Updated Branches:
refs/heads/trunk 0e40e8311 -> 8bb556604
FLUME-2712. Optional channel errors slows down the Source to Main channel event rate
(Johny Rufus via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8bb55660
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8bb55660
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8bb55660
Branch: refs/heads/trunk
Commit: 8bb556604047974775eb2da4c5c1686d89fe62d2
Parents: 0e40e83
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Oct 30 16:36:40 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Oct 30 16:37:18 2015 -0700
----------------------------------------------------------------------
.../apache/flume/channel/ChannelProcessor.java | 146 +++++++------------
.../flume/channel/TestChannelProcessor.java | 69 +++++++++
2 files changed, 122 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/8bb55660/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
index 1cce137..f2612a6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
@@ -20,10 +20,14 @@ package org.apache.flume.channel;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
@@ -57,6 +61,7 @@ public class ChannelProcessor implements Configurable {
private final ChannelSelector selector;
private final InterceptorChain interceptorChain;
+ private ExecutorService execService;
public ChannelProcessor(ChannelSelector selector) {
this.selector = selector;
@@ -77,6 +82,8 @@ public class ChannelProcessor implements Configurable {
*/
@Override
public void configure(Context context) {
+ this.execService = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("OptionalChannelProcessorThread").build());
configureInterceptors(context);
}
@@ -153,7 +160,6 @@ public class ChannelProcessor implements Configurable {
for (Event event : events) {
List<Channel> reqChannels = selector.getRequiredChannels(event);
-
for (Channel ch : reqChannels) {
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
@@ -164,74 +170,26 @@ public class ChannelProcessor implements Configurable {
}
List<Channel> optChannels = selector.getOptionalChannels(event);
-
for (Channel ch: optChannels) {
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}
-
eventQueue.add(event);
}
}
// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
- Transaction tx = reqChannel.getTransaction();
- Preconditions.checkNotNull(tx, "Transaction object must not be null");
- try {
- tx.begin();
-
- List<Event> batch = reqChannelQueue.get(reqChannel);
-
- for (Event event : batch) {
- reqChannel.put(event);
- }
-
- tx.commit();
- } catch (Throwable t) {
- tx.rollback();
- if (t instanceof Error) {
- LOG.error("Error while writing to required channel: " +
- reqChannel, t);
- throw (Error) t;
- } else {
- throw new ChannelException("Unable to put batch on required " +
- "channel: " + reqChannel, t);
- }
- } finally {
- if (tx != null) {
- tx.close();
- }
- }
+ List<Event> batch = reqChannelQueue.get(reqChannel);
+ executeChannelTransaction(reqChannel, batch, false);
}
// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
- Transaction tx = optChannel.getTransaction();
- Preconditions.checkNotNull(tx, "Transaction object must not be null");
- try {
- tx.begin();
-
- List<Event> batch = optChannelQueue.get(optChannel);
-
- for (Event event : batch ) {
- optChannel.put(event);
- }
-
- tx.commit();
- } catch (Throwable t) {
- tx.rollback();
- LOG.error("Unable to put batch on optional channel: " + optChannel, t);
- if (t instanceof Error) {
- throw (Error) t;
- }
- } finally {
- if (tx != null) {
- tx.close();
- }
- }
+ List<Event> batch = optChannelQueue.get(optChannel);
+ execService.submit(new OptionalChannelTransactionRunnable(optChannel, batch));
}
}
@@ -253,57 +211,59 @@ public class ChannelProcessor implements Configurable {
if (event == null) {
return;
}
+ List<Event> events = new ArrayList<Event>(1);
+ events.add(event);
// Process required channels
List<Channel> requiredChannels = selector.getRequiredChannels(event);
for (Channel reqChannel : requiredChannels) {
- Transaction tx = reqChannel.getTransaction();
- Preconditions.checkNotNull(tx, "Transaction object must not be null");
- try {
- tx.begin();
-
- reqChannel.put(event);
-
- tx.commit();
- } catch (Throwable t) {
- tx.rollback();
- if (t instanceof Error) {
- LOG.error("Error while writing to required channel: " +
- reqChannel, t);
- throw (Error) t;
- } else {
- throw new ChannelException("Unable to put event on required " +
- "channel: " + reqChannel, t);
- }
- } finally {
- if (tx != null) {
- tx.close();
- }
- }
+ executeChannelTransaction(reqChannel, events, false);
}
// Process optional channels
List<Channel> optionalChannels = selector.getOptionalChannels(event);
for (Channel optChannel : optionalChannels) {
- Transaction tx = null;
- try {
- tx = optChannel.getTransaction();
- tx.begin();
+ execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
+ }
+ }
- optChannel.put(event);
+ private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
+ Transaction tx = channel.getTransaction();
+ Preconditions.checkNotNull(tx, "Transaction object must not be null");
+ try {
+ tx.begin();
- tx.commit();
- } catch (Throwable t) {
- tx.rollback();
- LOG.error("Unable to put event on optional channel: " + optChannel, t);
- if (t instanceof Error) {
- throw (Error) t;
- }
- } finally {
- if (tx != null) {
- tx.close();
- }
+ for (Event event : batch) {
+ channel.put(event);
}
+
+ tx.commit();
+ } catch (Throwable t) {
+ tx.rollback();
+ if (t instanceof Error) {
+ LOG.error("Error while writing to channel: " +
+ channel, t);
+ throw (Error) t;
+ } else if(!isOptional) {
+ throw new ChannelException("Unable to put batch on required " +
+ "channel: " + channel, t);
+ }
+ } finally {
+ tx.close();
+ }
+ }
+
+ private static class OptionalChannelTransactionRunnable implements Runnable {
+ private Channel channel;
+ private List<Event> events;
+
+ OptionalChannelTransactionRunnable(Channel channel, List<Event> events) {
+ this.channel = channel;
+ this.events = events;
+ }
+
+ public void run() {
+ executeChannelTransaction(channel, events, true);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/8bb55660/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
index 0656596..924c998 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
@@ -20,11 +20,16 @@ package org.apache.flume.channel;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
import java.util.List;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
import org.junit.Test;
@@ -79,4 +84,68 @@ public class TestChannelProcessor {
Assert.assertTrue("Must throw NPE", threw);
}
+ /*
+ * Test delivery to optional and required channels
+ * Test both processEvent and processEventBatch
+ */
+ @Test
+ public void testRequiredAndOptionalChannels() {
+ Context context = new Context();
+ ArrayList<Channel> channels = new ArrayList<Channel>();
+ for(int i = 0; i < 4; i++) {
+ Channel ch = new MemoryChannel();
+ ch.setName("ch"+i);
+ Configurables.configure(ch, context);
+ channels.add(ch);
+ }
+
+ ChannelSelector selector = new ReplicatingChannelSelector();
+ selector.setChannels(channels);
+
+ context = new Context();
+ context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch2 ch3");
+ Configurables.configure(selector, context);
+
+ ChannelProcessor processor = new ChannelProcessor(selector);
+ context = new Context();
+ Configurables.configure(processor, context);
+
+
+ Event event1 = EventBuilder.withBody("event 1", Charsets.UTF_8);
+ processor.processEvent(event1);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ }
+
+ for(Channel channel : channels) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ Event event_ch = channel.take();
+ Assert.assertEquals(event1, event_ch);
+ transaction.commit();
+ transaction.close();
+ }
+
+ List<Event> events = Lists.newArrayList();
+ for(int i = 0; i < 100; i ++) {
+ events.add(EventBuilder.withBody("event "+i, Charsets.UTF_8));
+ }
+ processor.processEventBatch(events);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {
+ }
+ for(Channel channel : channels) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ for(int i = 0; i < 100; i ++) {
+ Event event_ch = channel.take();
+ Assert.assertNotNull(event_ch);
+ }
+ transaction.commit();
+ transaction.close();
+ }
+ }
+
}