You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/10/31 00:37:30 UTC

flume git commit: FLUME-2712. Optional channel errors slows down the Source to Main channel event rate

Repository: flume
Updated Branches:
  refs/heads/trunk 0e40e8311 -> 8bb556604


FLUME-2712. Optional channel errors slows down the Source to Main channel event rate

(Johny Rufus via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8bb55660
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8bb55660
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8bb55660

Branch: refs/heads/trunk
Commit: 8bb556604047974775eb2da4c5c1686d89fe62d2
Parents: 0e40e83
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Oct 30 16:36:40 2015 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Oct 30 16:37:18 2015 -0700

----------------------------------------------------------------------
 .../apache/flume/channel/ChannelProcessor.java  | 146 +++++++------------
 .../flume/channel/TestChannelProcessor.java     |  69 +++++++++
 2 files changed, 122 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/8bb55660/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
index 1cce137..f2612a6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
@@ -20,10 +20,14 @@ package org.apache.flume.channel;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
@@ -57,6 +61,7 @@ public class ChannelProcessor implements Configurable {
 
   private final ChannelSelector selector;
   private final InterceptorChain interceptorChain;
+  private ExecutorService execService;
 
   public ChannelProcessor(ChannelSelector selector) {
     this.selector = selector;
@@ -77,6 +82,8 @@ public class ChannelProcessor implements Configurable {
    */
   @Override
   public void configure(Context context) {
+    this.execService = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setNameFormat("OptionalChannelProcessorThread").build());
     configureInterceptors(context);
   }
 
@@ -153,7 +160,6 @@ public class ChannelProcessor implements Configurable {
 
     for (Event event : events) {
       List<Channel> reqChannels = selector.getRequiredChannels(event);
-
       for (Channel ch : reqChannels) {
         List<Event> eventQueue = reqChannelQueue.get(ch);
         if (eventQueue == null) {
@@ -164,74 +170,26 @@ public class ChannelProcessor implements Configurable {
       }
 
       List<Channel> optChannels = selector.getOptionalChannels(event);
-
       for (Channel ch: optChannels) {
         List<Event> eventQueue = optChannelQueue.get(ch);
         if (eventQueue == null) {
           eventQueue = new ArrayList<Event>();
           optChannelQueue.put(ch, eventQueue);
         }
-
         eventQueue.add(event);
       }
     }
 
     // Process required channels
     for (Channel reqChannel : reqChannelQueue.keySet()) {
-      Transaction tx = reqChannel.getTransaction();
-      Preconditions.checkNotNull(tx, "Transaction object must not be null");
-      try {
-        tx.begin();
-
-        List<Event> batch = reqChannelQueue.get(reqChannel);
-
-        for (Event event : batch) {
-          reqChannel.put(event);
-        }
-
-        tx.commit();
-      } catch (Throwable t) {
-        tx.rollback();
-        if (t instanceof Error) {
-          LOG.error("Error while writing to required channel: " +
-              reqChannel, t);
-          throw (Error) t;
-        } else {
-          throw new ChannelException("Unable to put batch on required " +
-              "channel: " + reqChannel, t);
-        }
-      } finally {
-        if (tx != null) {
-          tx.close();
-        }
-      }
+      List<Event> batch = reqChannelQueue.get(reqChannel);
+      executeChannelTransaction(reqChannel, batch, false);
     }
 
     // Process optional channels
     for (Channel optChannel : optChannelQueue.keySet()) {
-      Transaction tx = optChannel.getTransaction();
-      Preconditions.checkNotNull(tx, "Transaction object must not be null");
-      try {
-        tx.begin();
-
-        List<Event> batch = optChannelQueue.get(optChannel);
-
-        for (Event event : batch ) {
-          optChannel.put(event);
-        }
-
-        tx.commit();
-      } catch (Throwable t) {
-        tx.rollback();
-        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
-        if (t instanceof Error) {
-          throw (Error) t;
-        }
-      } finally {
-        if (tx != null) {
-          tx.close();
-        }
-      }
+      List<Event> batch = optChannelQueue.get(optChannel);
+      execService.submit(new OptionalChannelTransactionRunnable(optChannel, batch));
     }
   }
 
@@ -253,57 +211,59 @@ public class ChannelProcessor implements Configurable {
     if (event == null) {
       return;
     }
+    List<Event> events = new ArrayList<Event>(1);
+    events.add(event);
 
     // Process required channels
     List<Channel> requiredChannels = selector.getRequiredChannels(event);
     for (Channel reqChannel : requiredChannels) {
-      Transaction tx = reqChannel.getTransaction();
-      Preconditions.checkNotNull(tx, "Transaction object must not be null");
-      try {
-        tx.begin();
-
-        reqChannel.put(event);
-
-        tx.commit();
-      } catch (Throwable t) {
-        tx.rollback();
-        if (t instanceof Error) {
-          LOG.error("Error while writing to required channel: " +
-              reqChannel, t);
-          throw (Error) t;
-        } else {
-          throw new ChannelException("Unable to put event on required " +
-              "channel: " + reqChannel, t);
-        }
-      } finally {
-        if (tx != null) {
-          tx.close();
-        }
-      }
+      executeChannelTransaction(reqChannel, events, false);
     }
 
     // Process optional channels
     List<Channel> optionalChannels = selector.getOptionalChannels(event);
     for (Channel optChannel : optionalChannels) {
-      Transaction tx = null;
-      try {
-        tx = optChannel.getTransaction();
-        tx.begin();
+      execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
+    }
+  }
 
-        optChannel.put(event);
+  private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
+    Transaction tx = channel.getTransaction();
+    Preconditions.checkNotNull(tx, "Transaction object must not be null");
+    try {
+      tx.begin();
 
-        tx.commit();
-      } catch (Throwable t) {
-        tx.rollback();
-        LOG.error("Unable to put event on optional channel: " + optChannel, t);
-        if (t instanceof Error) {
-          throw (Error) t;
-        }
-      } finally {
-        if (tx != null) {
-          tx.close();
-        }
+      for (Event event : batch) {
+        channel.put(event);
       }
+
+      tx.commit();
+    } catch (Throwable t) {
+      tx.rollback();
+      if (t instanceof Error) {
+        LOG.error("Error while writing to channel: " +
+                channel, t);
+        throw (Error) t;
+      } else if(!isOptional) {
+          throw new ChannelException("Unable to put batch on required " +
+                  "channel: " + channel, t);
+      }
+    } finally {
+      tx.close();
+    }
+  }
+
+  private static class OptionalChannelTransactionRunnable implements Runnable {
+    private Channel channel;
+    private List<Event> events;
+
+    OptionalChannelTransactionRunnable(Channel channel, List<Event> events) {
+      this.channel = channel;
+      this.events = events;
+    }
+
+    public void run() {
+      executeChannelTransaction(channel, events, true);
     }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/8bb55660/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
index 0656596..924c998 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
@@ -20,11 +20,16 @@ package org.apache.flume.channel;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -79,4 +84,68 @@ public class TestChannelProcessor {
     Assert.assertTrue("Must throw NPE", threw);
   }
 
+  /*
+   * Test delivery to optional and required channels
+   * Test both processEvent and processEventBatch
+   */
+  @Test
+  public void testRequiredAndOptionalChannels() {
+    Context context = new Context();
+    ArrayList<Channel> channels = new ArrayList<Channel>();
+    for(int i = 0; i < 4; i++) {
+      Channel ch = new MemoryChannel();
+      ch.setName("ch"+i);
+      Configurables.configure(ch, context);
+      channels.add(ch);
+    }
+
+    ChannelSelector selector = new ReplicatingChannelSelector();
+    selector.setChannels(channels);
+
+    context = new Context();
+    context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch2 ch3");
+    Configurables.configure(selector, context);
+
+    ChannelProcessor processor = new ChannelProcessor(selector);
+    context = new Context();
+    Configurables.configure(processor, context);
+
+
+    Event event1 = EventBuilder.withBody("event 1", Charsets.UTF_8);
+    processor.processEvent(event1);
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+    }
+
+    for(Channel channel : channels) {
+      Transaction transaction = channel.getTransaction();
+      transaction.begin();
+      Event event_ch = channel.take();
+      Assert.assertEquals(event1, event_ch);
+      transaction.commit();
+      transaction.close();
+    }
+
+    List<Event> events = Lists.newArrayList();
+    for(int i = 0; i < 100; i ++) {
+      events.add(EventBuilder.withBody("event "+i, Charsets.UTF_8));
+    }
+    processor.processEventBatch(events);
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException e) {
+    }
+    for(Channel channel : channels) {
+      Transaction transaction = channel.getTransaction();
+      transaction.begin();
+      for(int i = 0; i < 100; i ++) {
+        Event event_ch = channel.take();
+        Assert.assertNotNull(event_ch);
+      }
+      transaction.commit();
+      transaction.close();
+    }
+  }
+
 }