You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2016/03/09 20:05:33 UTC

flume git commit: FLUME-2891: Revert FLUME-2712 and FLUME-2886

Repository: flume
Updated Branches:
  refs/heads/trunk ffb52b9e6 -> caa64a1a6


FLUME-2891: Revert FLUME-2712 and FLUME-2886

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/caa64a1a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/caa64a1a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/caa64a1a

Branch: refs/heads/trunk
Commit: caa64a1a6d4bc97be5993cb468516e9ffe862794
Parents: ffb52b9
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Wed Mar 9 11:05:01 2016 -0800
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Wed Mar 9 11:05:01 2016 -0800

----------------------------------------------------------------------
 .../apache/flume/channel/ChannelProcessor.java  | 151 ++++++++++++-------
 .../flume/channel/TestChannelProcessor.java     |  52 +------
 2 files changed, 98 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/caa64a1a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
index 7b2de7c..1cce137 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java
@@ -20,13 +20,10 @@ package org.apache.flume.channel;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
@@ -60,8 +57,6 @@ public class ChannelProcessor implements Configurable {
 
   private final ChannelSelector selector;
   private final InterceptorChain interceptorChain;
-  private ExecutorService execService;
-  BlockingQueue<Runnable> taskQueue;
 
   public ChannelProcessor(ChannelSelector selector) {
     this.selector = selector;
@@ -82,13 +77,6 @@ public class ChannelProcessor implements Configurable {
    */
   @Override
   public void configure(Context context) {
-    int queueSize = context.getInteger("pendingTransactions", 20);
-    taskQueue = new ArrayBlockingQueue<Runnable>(queueSize, true);
-    ThreadFactory factory = new ThreadFactoryBuilder()
-      .setNameFormat("OptionalChannelProcessorThread").build();
-    this.execService =
-      new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, taskQueue,
-        factory, new ThreadPoolExecutor.DiscardPolicy());
     configureInterceptors(context);
   }
 
@@ -165,6 +153,7 @@ public class ChannelProcessor implements Configurable {
 
     for (Event event : events) {
       List<Channel> reqChannels = selector.getRequiredChannels(event);
+
       for (Channel ch : reqChannels) {
         List<Event> eventQueue = reqChannelQueue.get(ch);
         if (eventQueue == null) {
@@ -175,26 +164,74 @@ public class ChannelProcessor implements Configurable {
       }
 
       List<Channel> optChannels = selector.getOptionalChannels(event);
+
       for (Channel ch: optChannels) {
         List<Event> eventQueue = optChannelQueue.get(ch);
         if (eventQueue == null) {
           eventQueue = new ArrayList<Event>();
           optChannelQueue.put(ch, eventQueue);
         }
+
         eventQueue.add(event);
       }
     }
 
     // Process required channels
     for (Channel reqChannel : reqChannelQueue.keySet()) {
-      List<Event> batch = reqChannelQueue.get(reqChannel);
-      executeChannelTransaction(reqChannel, batch, false);
+      Transaction tx = reqChannel.getTransaction();
+      Preconditions.checkNotNull(tx, "Transaction object must not be null");
+      try {
+        tx.begin();
+
+        List<Event> batch = reqChannelQueue.get(reqChannel);
+
+        for (Event event : batch) {
+          reqChannel.put(event);
+        }
+
+        tx.commit();
+      } catch (Throwable t) {
+        tx.rollback();
+        if (t instanceof Error) {
+          LOG.error("Error while writing to required channel: " +
+              reqChannel, t);
+          throw (Error) t;
+        } else {
+          throw new ChannelException("Unable to put batch on required " +
+              "channel: " + reqChannel, t);
+        }
+      } finally {
+        if (tx != null) {
+          tx.close();
+        }
+      }
     }
 
     // Process optional channels
     for (Channel optChannel : optChannelQueue.keySet()) {
-      List<Event> batch = optChannelQueue.get(optChannel);
-      execService.submit(new OptionalChannelTransactionRunnable(optChannel, batch));
+      Transaction tx = optChannel.getTransaction();
+      Preconditions.checkNotNull(tx, "Transaction object must not be null");
+      try {
+        tx.begin();
+
+        List<Event> batch = optChannelQueue.get(optChannel);
+
+        for (Event event : batch ) {
+          optChannel.put(event);
+        }
+
+        tx.commit();
+      } catch (Throwable t) {
+        tx.rollback();
+        LOG.error("Unable to put batch on optional channel: " + optChannel, t);
+        if (t instanceof Error) {
+          throw (Error) t;
+        }
+      } finally {
+        if (tx != null) {
+          tx.close();
+        }
+      }
     }
   }
 
@@ -216,59 +253,57 @@ public class ChannelProcessor implements Configurable {
     if (event == null) {
       return;
     }
-    List<Event> events = new ArrayList<Event>(1);
-    events.add(event);
 
     // Process required channels
     List<Channel> requiredChannels = selector.getRequiredChannels(event);
     for (Channel reqChannel : requiredChannels) {
-      executeChannelTransaction(reqChannel, events, false);
+      Transaction tx = reqChannel.getTransaction();
+      Preconditions.checkNotNull(tx, "Transaction object must not be null");
+      try {
+        tx.begin();
+
+        reqChannel.put(event);
+
+        tx.commit();
+      } catch (Throwable t) {
+        tx.rollback();
+        if (t instanceof Error) {
+          LOG.error("Error while writing to required channel: " +
+              reqChannel, t);
+          throw (Error) t;
+        } else {
+          throw new ChannelException("Unable to put event on required " +
+              "channel: " + reqChannel, t);
+        }
+      } finally {
+        if (tx != null) {
+          tx.close();
+        }
+      }
     }
 
     // Process optional channels
     List<Channel> optionalChannels = selector.getOptionalChannels(event);
     for (Channel optChannel : optionalChannels) {
-      execService.submit(new OptionalChannelTransactionRunnable(optChannel, events));
-    }
-  }
-
-  private static void executeChannelTransaction(Channel channel, List<Event> batch, boolean isOptional) {
-    Transaction tx = channel.getTransaction();
-    Preconditions.checkNotNull(tx, "Transaction object must not be null");
-    try {
-      tx.begin();
+      Transaction tx = null;
+      try {
+        tx = optChannel.getTransaction();
+        tx.begin();
 
-      for (Event event : batch) {
-        channel.put(event);
-      }
+        optChannel.put(event);
 
-      tx.commit();
-    } catch (Throwable t) {
-      tx.rollback();
-      if (t instanceof Error) {
-        LOG.error("Error while writing to channel: " +
-                channel, t);
-        throw (Error) t;
-      } else if(!isOptional) {
-          throw new ChannelException("Unable to put batch on required " +
-                  "channel: " + channel, t);
+        tx.commit();
+      } catch (Throwable t) {
+        tx.rollback();
+        LOG.error("Unable to put event on optional channel: " + optChannel, t);
+        if (t instanceof Error) {
+          throw (Error) t;
+        }
+      } finally {
+        if (tx != null) {
+          tx.close();
+        }
       }
-    } finally {
-      tx.close();
-    }
-  }
-
-  private static class OptionalChannelTransactionRunnable implements Runnable {
-    private Channel channel;
-    private List<Event> events;
-
-    OptionalChannelTransactionRunnable(Channel channel, List<Event> events) {
-      this.channel = channel;
-      this.events = events;
-    }
-
-    public void run() {
-      executeChannelTransaction(channel, events, true);
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/caa64a1a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
index c2a5748..b37b823 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelProcessor.java
@@ -23,12 +23,8 @@ import com.google.common.collect.Lists;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.flume.Channel;
-import org.apache.flume.ChannelException;
-import org.apache.flume.ChannelSelector;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.Context;
+
+import org.apache.flume.*;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
@@ -85,9 +81,9 @@ public class TestChannelProcessor {
   }
 
   /*
-   * Test delivery to optional and required channels
-   * Test both processEvent and processEventBatch
-   */
+ * Test delivery to optional and required channels
+ * Test both processEvent and processEventBatch
+ */
   @Test
   public void testRequiredAndOptionalChannels() {
     Context context = new Context();
@@ -148,42 +144,4 @@ public class TestChannelProcessor {
     }
   }
 
-  @SuppressWarnings("unchecked")
-  @Test
-  public void testOptionalChannelQueueSize() throws InterruptedException {
-    Context context = new Context();
-    context.put("capacity", "100");
-    context.put("transactionCapacity", "3");
-    context.put("pendingTransactions", "2");
-
-    ArrayList<MemoryChannel> channels = new ArrayList<MemoryChannel>();
-    for (int i = 0; i < 2; i++) {
-      MemoryChannel ch = new MemoryChannel();
-      ch.setName("ch" + i);
-      channels.add(ch);
-    }
-    Configurables.configure(channels.get(0), context);
-    context.put("capacity", "3");
-    Configurables.configure(channels.get(1), context);
-    ChannelSelector selector = new ReplicatingChannelSelector();
-    selector.setChannels((List) channels);
-
-    context.put(ReplicatingChannelSelector.CONFIG_OPTIONAL, "ch1");
-    Configurables.configure(selector, context);
-
-    ChannelProcessor processor = new ChannelProcessor(selector);
-    Configurables.configure(processor, context);
-
-    // The idea is to put more events into the optional channel than its capacity + the size of
-    // the task queue. So the remaining events get added to the task queue, but since it is
-    // bounded, its size should not grow indefinitely either.
-    for (int i = 0; i <= 6; i++) {
-      processor.processEvent(EventBuilder.withBody("e".getBytes()));
-      // To avoid tasks from being rejected so if previous events are still not committed, wait
-      // between transactions.
-      Thread.sleep(500);
-    }
-    // 3 in channel, 1 executing, 2 in queue, 1 rejected
-    Assert.assertEquals(2, processor.taskQueue.size());
-  }
 }