You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/03/09 20:05:33 UTC
flume git commit: FLUME-2891: Revert FLUME-2712 and FLUME-2886
Repository: flume
Updated Branches:
refs/heads/trunk ffb52b9e6 -> caa64a1a6
FLUME-2891: Revert FLUME-2712 and FLUME-2886
(Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/caa64a1a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/caa64a1a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/caa64a1a
Branch: refs/heads/trunk
Commit: caa64a1a6d4bc97be5993cb468516e9ffe862794
Parents: ffb52b9
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Mar 9 11:05:01 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Mar 9 11:05:01 2016 -0800
----------------------------------------------------------------------
.../apache/flume/channel/ChannelProcessor.java | 151 ++++++++++++-------
.../flume/channel/TestChannelProcessor.java | 52 +------
2 files changed, 98 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/caa64a1a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
index 7b2de7c..1cce137 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
@@ -20,13 +20,10 @@ package org.apache.flume.channel;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.*;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
@@ -60,8 +57,6 @@ public class ChannelProcessor implements Configurable {
private final ChannelSelector selector;
private final InterceptorChain interceptorChain;
- private ExecutorService execService;
- BlockingQueue<Runnable> taskQueue;
public ChannelProcessor(ChannelSelector selector) {
this.selector = selector;
@@ -82,13 +77,6 @@ public class ChannelProcessor implements Configurable {
*/
@Override
public void configure(Context context) {
- int queueSize = context.getInteger("pendingTransactions", 20);
- taskQueue = new ArrayBlockingQueue<Runnable>(queueSize, true);
- ThreadFactory factory = new ThreadFactoryBuilder()
- .setNameFormat("OptionalChannelProcessorThread").build();
- this.execService =
- new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, taskQueue,
- factory, new ThreadPoolExecutor.DiscardPolicy());
configureInterceptors(context);
}
@@ -165,6 +153,7 @@ public class ChannelProcessor implements Configurable {
for (Event event : events) {
List<Channel> reqChannels = selector.getRequiredChannels(event);
+
for (Channel ch : reqChannels) {
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
@@ -175,26 +164,74 @@ public class ChannelProcessor implements Configurable {
}
List<Channel> optChannels = selector.getOptionalChannels(event);
+
for (Channel ch: optChannels) {
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}
+
eventQueue.add(event);
}
}
// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
- List<Event> batch = reqChannelQueue.get(reqChannel);
- executeChannelTransaction(reqChannel, batch, false);
+ Transaction tx = reqChannel.getTransaction();
+ Preconditions.checkNotNull(tx, "Transaction object must not be null");
+ try {
+ tx.begin();
+
+ List<Event> batch = reqChannelQueue.get(reqChannel);
+
+ for (Event event : batch) {
+ reqChannel.put(event);
+ }
+
+ tx.commit();
+ } catch (Throwable t) {
+ tx.rollback();
+ if (t instanceof Error) {
+ LOG.error("Error while writing to required channel: " +
+ reqChannel, t);
+ throw (Error) t;
+ } else {
+ throw new ChannelException("Unable to put batch on required " +
+ "channel: " + reqChannel, t);
+ }
+ } finally {
+ if (tx != null) {
+ tx.close();
+ }
+ }
}
// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
- List<Event> batch = optChannelQueue.get(optChannel);
- execService.submit(new OptionalChannelTransactionRunnable(optChannel, batch));
+ Transaction tx = optChannel.getTransaction();
+ Preconditions.checkNotNull(tx, "Transaction object must not be null");
+ try {
+ tx.begin();
+
+ List<Event> batch = optChannelQueue.get(optChannel);
+
+ for (Event event : batch ) {
+ optChannel.put(event);
+ }
+
+ tx.commit();
+ } catch (Throwable t) {
+ tx.rollback();
+ LOG.error("Unable to put batch on optional channel: " + optChannel, t);
+ if (t instanceof Error) {
+ throw (Error) t;
+ }
+ } finally {
+ if (tx != null) {
+ tx.close();
+ }
+ }
}
}
@@ -216,59 +253,57 @@ public class ChannelProcessor implements Configurable {
if (event == null) {
return;
}
- List<Event> events = new ArrayList<Event>(1);
- events.add(event);
// Process required channels
List<Channel> requiredChannels = selector.getRequiredChannels(event);
for (Channel reqChannel : requiredChannels) {
- executeChannelTransaction(reqChannel, events, false);
+ Transaction tx = reqChannel.getTransaction();
+ Preconditions.checkNotNull(tx, "Transaction object must not be null");
+ try {
+ tx.begin();
+
+ reqChannel.put(event);
+
+ tx.commit();
+ } catch (Throwable t) {
+ tx.rollback();
+ if (t instanceof Error) {
+ LOG.error("Error while writing to required channel: " +
+ reqChannel, t);
+ throw (Error) t;
+ } else {
+ throw new ChannelException("Unable to put event on required " +
+ "channel: " + reqChannel, t);
+ }
+ } finally {
+ if (tx != null) {
+ tx.close();
+ }
+ }
}
// Process optional channels
List<Channel> optionalChannels = selector.getOptionalChannels(event);
for (Channel optChannel : optionalChannels) {
- execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
- }
- }
-
- private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
- Transaction tx = channel.getTransaction();
- Preconditions.checkNotNull(tx, "Transaction object must not be null");
- try {
- tx.begin();
+ Transaction tx = null;
+ try {
+ tx = optChannel.getTransaction();
+ tx.begin();
- for (Event event : batch) {
- channel.put(event);
- }
+ optChannel.put(event);
- tx.commit();
- } catch (Throwable t) {
- tx.rollback();
- if (t instanceof Error) {
- LOG.error("Error while writing to channel: " +
- channel, t);
- throw (Error) t;
- } else if(!isOptional) {
- throw new ChannelException("Unable to put batch on required " +
- "channel: " + channel, t);
+ tx.commit();
+ } catch (Throwable t) {
+ tx.rollback();
+ LOG.error("Unable to put event on optional channel: " + optChannel, t);
+ if (t instanceof Error) {
+ throw (Error) t;
+ }
+ } finally {
+ if (tx != null) {
+ tx.close();
+ }
}
- } finally {
- tx.close();
- }
- }
-
- private static class OptionalChannelTransactionRunnable implements Runnable {
- private Channel channel;
- private List<Event> events;
-
- OptionalChannelTransactionRunnable(Channel channel, List<Event> events) {
- this.channel = channel;
- this.events = events;
- }
-
- public void run() {
- executeChannelTransaction(channel, events, true);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/caa64a1a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
index c2a5748..b37b823 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
@@ -23,12 +23,8 @@ import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.Context;
+
+import org.apache.flume.*;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
@@ -85,9 +81,9 @@ public class TestChannelProcessor {
}
/*
- * Test delivery to optional and required channels
- * Test both processEvent and processEventBatch
- */
+ * Test delivery to optional and required channels
+ * Test both processEvent and processEventBatch
+ */
@Test
public void testRequiredAndOptionalChannels() {
Context context = new Context();
@@ -148,42 +144,4 @@ public class TestChannelProcessor {
}
}
- @SuppressWarnings("unchecked")
- @Test
- public void testOptionalChannelQueueSize() throws InterruptedException {
- Context context = new Context();
- context.put("capacity", "100");
- context.put("transactionCapacity", "3");
- context.put("pendingTransactions", "2");
-
- ArrayList<MemoryChannel> channels = new ArrayList<MemoryChannel>();
- for (int i = 0; i < 2; i++) {
- MemoryChannel ch = new MemoryChannel();
- ch.setName("ch" + i);
- channels.add(ch);
- }
- Configurables.configure(channels.get(0), context);
- context.put("capacity", "3");
- Configurables.configure(channels.get(1), context);
- ChannelSelector selector = new ReplicatingChannelSelector();
- selector.setChannels((List) channels);
-
- context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1");
- Configurables.configure(selector, context);
-
- ChannelProcessor processor = new ChannelProcessor(selector);
- Configurables.configure(processor, context);
-
- // The idea is to put more events into the optional channel than its capacity + the size of
- // the task queue. So the remaining events get added to the task queue, but since it is
- // bounded, its size should not grow indefinitely either.
- for (int i = 0; i <= 6; i++) {
- processor.processEvent(EventBuilder.withBody("e".getBytes()));
- // To avoid tasks from being rejected so if previous events are still not committed, wait
- // between transactions.
- Thread.sleep(500);
- }
- // 3 in channel, 1 executing, 2 in queue, 1 rejected
- Assert.assertEquals(2, processor.taskQueue.size());
- }
}