You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2012/02/24 06:17:01 UTC

svn commit: r1293084 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/channel/ flume-ng-core/src/test/java/org/apache/flume/channel/ flume-ng-core/src/test/java/org/apache/flume/source/ flume-ng-node/src/test/java/...

Author: prasadm
Date: Fri Feb 24 05:17:00 2012
New Revision: 1293084

URL: http://svn.apache.org/viewvc?rev=1293084&view=rev
Log:
FLUME-936: MemoryChannel is not thread safe
(Juhani Connolly via Prasad Mujumdar)

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Fri Feb 24 05:17:00 2012
@@ -17,217 +17,159 @@
  */
 package org.apache.flume.channel;
 
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.flume.ChannelException;
+import javax.annotation.concurrent.GuardedBy;
+
 import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.Transaction;
+import org.apache.flume.ChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
-/**
- * Memory channel that with full transaction support Uses transaction object for
- * each thread (source and sink) attached to channel. The events are stored in
- * the thread safe Dequeue. * The put and take are directly executed in the
- * common queue. Channel has a marker for the last committed event in order to
- * avoid sink reading uncommitted data. The transactions keep track of the
- * actions to perform undo when rolled back.
- *
- */
-public class MemoryChannel extends AbstractChannel {
-
-  private static final Integer defaultCapacity = 50;
+public class MemoryChannel extends BasicChannelSemantics {
+  private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
+  private static final Integer defaultCapacity = 100;
+  private static final Integer defaultTransCapacity = 100;
   private static final Integer defaultKeepAlive = 3;
 
-  // wrap the event with a counter
-  private class StampedEvent {
-    private int timeStamp;
-    private Event event;
-
-    public StampedEvent(int stamp, Event E) {
-      timeStamp = stamp;
-      event = E;
-    }
-
-    public int getStamp() {
-      return timeStamp;
+  public class MemoryTransaction extends BasicTransactionSemantics {
+    private LinkedBlockingDeque<Event> takeList;
+    private LinkedBlockingDeque<Event> putList;
+
+    public MemoryTransaction(int transCapacity) {
+      putList = new LinkedBlockingDeque<Event>(transCapacity);
+      takeList = new LinkedBlockingDeque<Event>(transCapacity);
     }
 
-    public Event getEvent() {
-      return event;
-    }
-
-  }
-
-  /*
-   * transaction class maintain a 'undo' list for put/take from the queue. The
-   * rollback performs undo of the operations using these lists. Also maintain a
-   * stamp/counter for commit and last take. This is used to ensure that a
-   * transaction doesn't read uncommitted events.
-   */
-  public class MemTransaction implements Transaction {
-    private int putStamp;
-    private int takeStamp;
-    private LinkedList<StampedEvent> undoTakeList;
-    private LinkedList<StampedEvent> undoPutList;
-    private TransactionState txnState;
-    private int refCount;
-
-    public MemTransaction() {
-      txnState = TransactionState.Closed;
+    @Override
+    protected void doPut(Event event) {
+      if(!putList.offer(event)) {
+        throw new ChannelException("Put queue for MemoryTransaction of capacity " +
+            putList.size() + " full, consider committing more frequently, " +
+            "increasing capacity or increasing thread count");
+      }
     }
 
     @Override
-    /**
-     * Start the transaction
-     *  initialize the undo lists, stamps
-     *  set transaction state to Started
-     */
-    public void begin() {
-      if (++refCount > 1) {
-        return;
-      }
-      undoTakeList = new LinkedList<StampedEvent>();
-      undoPutList = new LinkedList<StampedEvent>();
-      putStamp = 0;
-      takeStamp = 0;
-      txnState = TransactionState.Started;
+    protected Event doTake() throws InterruptedException {
+      if(takeList.remainingCapacity() == 0) {
+        throw new ChannelException("Take list for MemoryTransaction, capacity " +
+            takeList.size() + " full, consider committing more frequently, " +
+            "increasing capacity, or increasing thread count");
+      }
+      if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
+        return null;
+      }
+      Event event;
+      synchronized(queueLock) {
+        event = queue.poll();
+      }
+      Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
+          "signalling existence of entry");
+      takeList.put(event);
+
+      return event;
     }
 
     @Override
-    /**
-     * Commit the transaction
-     *  If there was an event added by this transaction, then set the
-     *  commit stamp set transaction state to Committed
-     */
-    public void commit() {
-      Preconditions.checkArgument(txnState == TransactionState.Started,
-          "transaction not started");
-      if (--refCount > 0) {
-        return;
-      }
-
-      // if the txn put any events, then update the channel's stamp and
-      // signal for availability of committed data in the queue
-      if (putStamp != 0) {
-        lastCommitStamp.set(putStamp);
-        lock.lock();
-        try {
-          hasData.signal();
-        } finally {
-          lock.unlock();
+    protected void doCommit() throws InterruptedException {
+      int remainingChange = takeList.size() - putList.size();
+      if(remainingChange < 0) {
+        if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
+          throw new ChannelException("Space for commit to queue couldn't be acquired" +
+              " Sinks are likely not keeping up with sources, or the buffer size is too tight");
         }
       }
-      txnState = TransactionState.Committed;
-      undoPutList.clear();
-      undoTakeList.clear();
-    }
+      int puts = putList.size();
+      synchronized(queueLock) {
+        if(puts > 0 ) {
+          while(!putList.isEmpty()) {
+            if(!queue.offer(putList.removeFirst())) {
+              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
+            }
+          }
+        }
+        putList.clear();
+        takeList.clear();
+      }
+      queueStored.release(puts);
+      if(remainingChange > 0) {
+        queueRemaining.release(remainingChange);
+      }
 
-    @Override
-    /**
-     * Rollback the transaction
-     *  execute the channel's undoXXX to undo the actions done by this txn
-     *  set transaction state to rolled back
-     */
-    public void rollback() {
-      Preconditions.checkArgument(txnState == TransactionState.Started,
-          "transaction not started");
-      undoPut(this);
-      undoTake(this);
-      txnState = TransactionState.RolledBack;
-      refCount = 0;
     }
 
     @Override
-    /**
-     * Close the transaction
-     *  if the transaction is still open, then roll it back
-     *  set transaction state to Closed
-     */
-    public void close() {
-      if (txnState == TransactionState.Started) {
-        rollback();
+    protected void doRollback() {
+      int takes = takeList.size();
+      synchronized(queueLock) {
+        Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
+            "queue to rollback takes. This should never happen, please report");
+        while(!takeList.isEmpty()) {
+          queue.addFirst(takeList.removeLast());
+        }
+        putList.clear();
       }
-      txnState = TransactionState.Closed;
-      forgetTransaction(this);
+      queueStored.release(takes);
     }
 
-    public TransactionState getState() {
-      return txnState;
-    }
+  }
 
-    protected int lastTakeStamp() {
-      return takeStamp;
-    }
+  // lock to guard queue, mainly needed to keep it locked down during resizes
+  // it should never be held through a blocking operation
+  private Integer queueLock;
 
-    protected void logPut(StampedEvent e, int stamp) {
-      undoPutList.addLast(e);
-      putStamp = stamp;
-    }
+  @GuardedBy(value = "queueLock")
+  private LinkedBlockingDeque<Event> queue;
 
-    protected void logTake(StampedEvent e, int stamp) {
-      undoTakeList.addLast(e);
-      takeStamp = stamp;
-    }
+  // invariant that tracks the amount of space remaining in the queue(with all uncommitted takeLists deducted)
+  // we maintain the remaining permits = queue.remaining - takeList.size()
+  // this allows local threads waiting for space in the queue to commit without denying access to the
+  // shared lock to threads that would make more space on the queue
+  private Semaphore queueRemaining;
+  // used to make "reservations" to grab data from the queue.
+  // by using this we can block for a while to get data without locking all other threads out
+  // like we would if we tried to use a blocking call on queue
+  private Semaphore queueStored;
+  // maximum items in a transaction queue
+  private volatile Integer transCapacity;
+  private volatile int keepAlive;
 
-    protected StampedEvent removePut() {
-      if (undoPutList.isEmpty()) {
-        return null;
-      } else {
-        return undoPutList.removeLast();
-      }
-    }
 
-    protected StampedEvent removeTake() {
-      if (undoTakeList.isEmpty()) {
-        return null;
-      } else {
-        return undoTakeList.removeLast();
-      }
-    }
-
-  }
-
-  // The main event queue
-  private LinkedBlockingDeque<StampedEvent> queue;
-
-  private AtomicInteger currentStamp; // operation counter
-  private AtomicInteger lastCommitStamp; // counter for the last commit
-  private ConcurrentHashMap<Long, MemTransaction> txnMap; // open transactions
-  private Integer keepAlive;
-  final Lock lock = new ReentrantLock();
-  final Condition hasData = lock.newCondition();
-
-  /**
-   * Channel constructor
-   */
   public MemoryChannel() {
-    currentStamp = new AtomicInteger(1);
-    lastCommitStamp = new AtomicInteger(0);
-    txnMap = new ConcurrentHashMap<Long, MemTransaction>();
+    super();
+    queueLock = 0;
   }
 
-  /**
-   * set the event queue capacity
-   */
   @Override
   public void configure(Context context) {
-    String strCapacity = context.get("capacity", String.class);
+    String strCapacity = context.getString("capacity");
     Integer capacity = null;
-
-    if (strCapacity == null) {
+    if(strCapacity == null) {
       capacity = defaultCapacity;
     } else {
-      capacity = Integer.parseInt(strCapacity);
+      try {
+        capacity = Integer.parseInt(strCapacity);
+      } catch(NumberFormatException e) {
+        capacity = defaultCapacity;
+      }
+    }
+    String strTransCapacity = context.getString("transactionCapacity");
+    if(strTransCapacity == null) {
+      transCapacity = defaultTransCapacity;
+    } else {
+      try {
+        transCapacity = Integer.parseInt(strTransCapacity);
+      } catch(NumberFormatException e) {
+        transCapacity = defaultTransCapacity;
+      }
     }
+    Preconditions.checkState(transCapacity <= capacity);
 
     String strKeepAlive = context.get("keep-alive", String.class);
 
@@ -237,139 +179,51 @@ public class MemoryChannel extends Abstr
       keepAlive = Integer.parseInt(strKeepAlive);
     }
 
-    queue = new LinkedBlockingDeque<StampedEvent>(capacity);
-  }
-
-  @Override
-  /**
-   * Add the given event to the end of the queue
-   * save the event in the undoPut queue for possible rollback
-   * save the stamp of this put for commit
-   */
-  public void put(Event event) {
-    Preconditions.checkState(queue != null,
-        "No queue defined (Did you forget to configure me?");
-
-    try {
-      MemTransaction myTxn = findTransaction();
-      Preconditions.checkState(myTxn != null, "Transaction not started");
-
-      int myStamp = currentStamp.getAndIncrement();
-      StampedEvent stampedEvent = new StampedEvent(myStamp, event);
-      if (queue.offer(stampedEvent,keepAlive, TimeUnit.SECONDS) == false)
-        throw new ChannelException("put(" + event + ") timed out");
-      myTxn.logPut(stampedEvent, myStamp);
-
-    } catch (InterruptedException ex) {
-      throw new ChannelException("Failed to put(" + event + ")", ex);
+    if(queue != null) {
+      try {
+        resizeQueue(capacity);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    } else {
+      synchronized(queueLock) {
+        queue = new LinkedBlockingDeque<Event>(capacity);
+        queueRemaining = new Semaphore(capacity);
+        queueStored = new Semaphore(0);
+      }
     }
   }
 
-  /**
-   * undo of put for all the events in the undoPut queue, remove those from the
-   * event queue
-   *
-   * @param myTxn
-   */
-  protected void undoPut(MemTransaction myTxn) {
-    StampedEvent undoEvent;
-    StampedEvent currentEvent;
-
-    while ((undoEvent = myTxn.removePut()) != null) {
-      currentEvent = queue.removeLast();
-      Preconditions.checkNotNull(currentEvent, "Rollback error");
-      Preconditions.checkArgument(currentEvent == undoEvent, "Rollback error");
+  private void resizeQueue(int capacity) throws InterruptedException {
+    int oldCapacity;
+    synchronized(queueLock) {
+      oldCapacity = queue.size() + queue.remainingCapacity();
     }
-  }
 
-  @Override
-  /**
-   * remove the event from the top of the queue and return it
-   * also add that event to undoTake queue for possible rollback
-   */
-  public Event take() {
-    Preconditions.checkState(queue != null, "Queue not configured");
-
-    try {
-      MemTransaction myTxn = findTransaction();
-      Preconditions.checkState(myTxn != null, "Transaction not started");
-      Event event = null;
-      int timeout = keepAlive;
-
-      // wait for some committed data be there in the queue
-      if ((timeout > 0) && (myTxn.lastTakeStamp() == lastCommitStamp.get())) {
-        lock.lock();
-        try {
-          hasData.await(timeout, TimeUnit.SECONDS);
-        } finally {
-          lock.unlock();
+    if(oldCapacity == capacity) {
+      return;
+    } else if (oldCapacity > capacity) {
+      if(!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
+        LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
+      } else {
+        synchronized(queueLock) {
+          LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
+          newQueue.addAll(queue);
+          queue = newQueue;
         }
-        timeout = 0; // don't wait any further
       }
-
-      // don't go past the last committed element
-      if (myTxn.lastTakeStamp() != lastCommitStamp.get()) {
-        StampedEvent e = queue.poll(timeout, TimeUnit.SECONDS);
-        if (e != null) {
-          myTxn.logTake(e, e.getStamp());
-          event = e.getEvent();
-        }
+    } else {
+      synchronized(queueLock) {
+        LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
+        newQueue.addAll(queue);
+        queue = newQueue;
       }
-      return event;
-    } catch (InterruptedException ex) {
-      throw new ChannelException("Failed to take()", ex);
-    }
-  }
-
-  /**
-   * undo of take operation for each event in the undoTake list, add it back to
-   * the event queue
-   *
-   * @param myTxn
-   */
-  protected void undoTake(MemTransaction myTxn) {
-    StampedEvent e;
-
-    while ((e = myTxn.removeTake()) != null) {
-      queue.addFirst(e);
+      queueRemaining.release(capacity - oldCapacity);
     }
   }
 
   @Override
-  /**
-   * Return the channel's transaction
-   */
-  public Transaction getTransaction() {
-    MemTransaction txn;
-
-    // check if there's already a transaction created for this thread
-    txn = findTransaction();
-
-    // Create a new transaction
-    if (txn == null) {
-      txn = new MemTransaction();
-      txnMap.put(Thread.currentThread().getId(), txn);
-    }
-    return txn;
-  }
-
-  /**
-   * Remove the given transaction from the list of open transactions
-   *
-   * @param myTxn
-   */
-  protected void forgetTransaction(MemTransaction myTxn) {
-    MemTransaction currTxn = findTransaction();
-    Preconditions.checkArgument(myTxn == currTxn, "Wrong transaction to close");
-    txnMap.remove(Thread.currentThread().getId());
-  }
-
-  // lookup the transaction for the current thread
-  protected MemTransaction findTransaction() {
-    try {
-      return txnMap.get(Thread.currentThread().getId());
-    } catch (NullPointerException eN) {
-      return null;
-    }
+  protected BasicTransactionSemantics createTransaction() {
+    return new MemoryTransaction(transCapacity);
   }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java Fri Feb 24 05:17:00 2012
@@ -19,7 +19,11 @@
 
 package org.apache.flume.channel;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
@@ -52,6 +56,7 @@ public class TestMemoryChannel {
     transaction.begin();
     channel.put(event);
     transaction.commit();
+    transaction.close();
 
     transaction = channel.getTransaction();
     Assert.assertNotNull(transaction);
@@ -62,4 +67,169 @@ public class TestMemoryChannel {
     transaction.commit();
   }
 
+  @Test
+  public void testChannelResize() {
+    Context context = new Context();
+    Map<String, Object> parms = new HashMap<String, Object>();
+    parms.put("capacity", "5");
+    parms.put("transactionCapacity", "5");
+    context.setParameters(parms);
+    Configurables.configure(channel,  context);
+
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    for(int i=0; i < 5; i++) {
+      channel.put(EventBuilder.withBody(String.format("test event %d", i).getBytes()));
+    }
+    transaction.commit();
+    transaction.close();
+
+    /*
+     * Verify overflow semantics
+     */
+    transaction = channel.getTransaction();
+    boolean overflowed = false;
+    try {
+      transaction.begin();
+      channel.put(EventBuilder.withBody("overflow event".getBytes()));
+      transaction.commit();
+    } catch (ChannelException e) {
+      overflowed = true;
+      transaction.rollback();
+    } finally {
+      transaction.close();
+    }
+    Assert.assertTrue(overflowed);
+
+    /*
+     * Reconfigure capacity down and add another event, shouldn't result in exception
+     */
+    parms.put("capacity", "6");
+    context.setParameters(parms);
+    Configurables.configure(channel, context);
+    transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody("extended capacity event".getBytes()));
+    transaction.commit();
+    transaction.close();
+
+    /*
+     * Attempt to reconfigure capacity to below current entry count and verify
+     * it wasn't carried out
+     */
+    parms.put("capacity", "2");
+    parms.put("transactionCapacity", "2");
+    context.setParameters(parms);
+    Configurables.configure(channel, context);
+    for(int i=0; i < 6; i++) {
+      transaction = channel.getTransaction();
+      transaction.begin();
+      Assert.assertNotNull(channel.take());
+      transaction.commit();
+      transaction.close();
+    }
+  }
+
+  @Test(expected=ChannelException.class)
+  public void testTransactionPutCapacityOverload() {
+    Context context = new Context();
+    Map<String, Object> parms = new HashMap<String, Object>();
+    parms.put("capacity", "5");
+    parms.put("transactionCapacity", "2");
+    context.setParameters(parms);
+    Configurables.configure(channel,  context);
+
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    // shouldn't be able to fit a third in the buffer
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    Assert.fail();
+  }
+
+  @Test(expected=ChannelException.class)
+  public void testCapacityOverload() {
+    Context context = new Context();
+    Map<String, Object> parms = new HashMap<String, Object>();
+    parms.put("capacity", "5");
+    parms.put("transactionCapacity", "3");
+    context.setParameters(parms);
+    Configurables.configure(channel,  context);
+
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    transaction.commit();
+    transaction.close();
+
+    transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    // this should kill  it
+    transaction.commit();
+    Assert.fail();
+  }
+
+  @Test
+  public void testBufferEmptyingAfterTakeCommit() {
+    Context context = new Context();
+    Map<String, Object> parms = new HashMap<String, Object>();
+    parms.put("capacity", "3");
+    parms.put("transactionCapacity", "3");
+    context.setParameters(parms);
+    Configurables.configure(channel,  context);
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    tx.commit();
+    tx.close();
+
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.take();
+    channel.take();
+    tx.commit();
+    tx.close();
+
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    tx.commit();
+    tx.close();
+  }
+
+  @Test
+  public void testBufferEmptyingAfterRollback() {
+    Context context = new Context();
+    Map<String, Object> parms = new HashMap<String, Object>();
+    parms.put("capacity", "3");
+    parms.put("transactionCapacity", "3");
+    context.setParameters(parms);
+    Configurables.configure(channel,  context);
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    tx.rollback();
+    tx.close();
+
+    tx = channel.getTransaction();
+    tx.begin();
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    channel.put(EventBuilder.withBody("test".getBytes()));
+    tx.commit();
+    tx.close();
+  }
 }

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java?rev=1293084&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java Fri Feb 24 05:17:00 2012
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel;
+
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMemoryChannelConcurrency {
+
+  private CyclicBarrier barrier;
+
+  @Before
+  public void setUp() {
+  }
+
+  @Test
+  public void testTransactionConcurrency() throws InterruptedException {
+    final Channel channel = new MemoryChannel();
+    barrier = new CyclicBarrier(2);
+
+    Configurables.configure(channel, new Context());
+    Thread t1 = new Thread(new Runnable() {
+      public void run() {
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        channel.put(EventBuilder.withBody("first event".getBytes()));
+        try {
+          barrier.await();
+          barrier.await();
+          tx.rollback();
+
+          barrier.await();
+          tx.close();
+          // final barrier to make sure both threads manage to finish
+          barrier.await();
+        } catch (InterruptedException e) {
+          Assert.fail();
+        } catch (BrokenBarrierException e) {
+          Assert.fail();
+        }
+      }
+    });
+
+    Thread t2 = new Thread(new Runnable() {
+      public void run() {
+        Transaction tx = channel.getTransaction();
+        try {
+          barrier.await();
+          tx.begin();
+          channel.put(EventBuilder.withBody("second event".getBytes()));
+          barrier.await();
+          barrier.await();
+          tx.commit();
+          tx.close();
+          // final barrier to make sure both threads manage to finish
+          barrier.await();
+        } catch (InterruptedException e) {
+          Assert.fail();
+        } catch (BrokenBarrierException e) {
+          Assert.fail();
+        }
+      }
+    });
+    t1.start();
+    t2.start();
+
+    t1.join(1000);
+    if (t1.isAlive()) {
+      Assert.fail("Thread1 failed to finish");
+      t1.interrupt();
+    }
+
+    t2.join(1000);
+    if (t2.isAlive()) {
+      Assert.fail("Thread2 failed to finish");
+      t2.interrupt();
+    }
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e = channel.take();
+    Assert.assertEquals("second event", new String(e.getBody()));
+    Assert.assertNull(channel.take());
+    tx.commit();
+    tx.close();
+  }
+
+  /**
+   * Works with a startgate/endgate latches to make sure all threads run at the same time. Threads randomly
+   * choose to commit or rollback random numbers of actions, tagging them with the thread no.
+   * The correctness check is made by recording committed entries into a map, and verifying the count
+   * after the endgate
+   * Since nothing is taking the puts out, allow for a big capacity
+   *
+   * @throws InterruptedException
+   */
+  @Test
+  public void testManyThreads() throws InterruptedException {
+    final Channel channel = new MemoryChannel();
+    Context context = new Context();
+    context.put("keep-alive", "1");
+    context.put("capacity", "5000"); // theoretical maximum of 100 threads * 10 * 5
+    // because we're just grabbing the whole lot in one commit
+    // normally a transactionCapacity significantly lower than the channel capacity would be recommended
+    context.put("transactionCapacity", "5000");
+    Configurables.configure(channel, context);
+    final ConcurrentHashMap<String, AtomicInteger> committedPuts =
+        new ConcurrentHashMap<String, AtomicInteger>();
+
+    final int threadCount = 100;
+    final CountDownLatch startGate = new CountDownLatch(1);
+    final CountDownLatch endGate = new CountDownLatch(threadCount);
+
+    for (int i = 0; i < threadCount; i++) {
+      Thread t = new Thread() {
+        public void run() {
+          Long tid = Thread.currentThread().getId();
+          String strtid = tid.toString();
+          Random rng = new Random(tid);
+
+          try {
+            startGate.await();
+          } catch (InterruptedException e1) {
+            Thread.currentThread().interrupt();
+          }
+          for(int j = 0; j < 10; j++) {
+            int events = rng.nextInt(5) + 1;
+            Transaction tx = channel.getTransaction();
+            tx.begin();
+            for(int k = 0; k < events; k++) {
+              channel.put(EventBuilder.withBody(strtid.getBytes()));
+            }
+            if(rng.nextBoolean()) {
+              tx.commit();
+              AtomicInteger tcount = committedPuts.get(strtid);
+              if(tcount == null) {
+                committedPuts.put(strtid, new AtomicInteger(events));
+              } else {
+                tcount.addAndGet(events);
+              }
+            } else {
+              tx.rollback();
+            }
+            tx.close();
+          }
+          endGate.countDown();
+        }
+      };
+      t.start();
+    }
+    startGate.countDown();
+    endGate.await();
+
+    if(committedPuts.isEmpty()) {
+      Assert.fail();
+    }
+
+    // verify the counts
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e;
+    while((e = channel.take()) != null) {
+      String index = new String(e.getBody());
+      AtomicInteger remain = committedPuts.get(index);
+      int post = remain.decrementAndGet();
+      if(post == 0) {
+        committedPuts.remove(index);
+      }
+    }
+    tx.commit();
+    tx.close();
+    if(!committedPuts.isEmpty()) {
+      Assert.fail();
+    }
+  }
+
+  @Test
+  public void testConcurrentSinksAndSources() throws InterruptedException {
+    final Channel channel = new MemoryChannel();
+    Context context = new Context();
+    context.put("keep-alive", "1");
+    context.put("capacity", "100"); // theoretical maximum of 100 threads * 10 * 5
+    // because we're just grabbing the whole lot in one commit
+    // normally a transactionCapacity significantly lower than the channel capacity would be recommended
+    context.put("transactionCapacity", "100");
+    Configurables.configure(channel, context);
+    final ConcurrentHashMap<String, AtomicInteger> committedPuts = new ConcurrentHashMap<String, AtomicInteger>();
+    final ConcurrentHashMap<String, AtomicInteger> committedTakes =
+        new ConcurrentHashMap<String, AtomicInteger>();
+
+    final int threadCount = 100;
+    final CountDownLatch startGate = new CountDownLatch(1);
+    final CountDownLatch endGate = new CountDownLatch(threadCount);
+
+    // start a sink and source for each
+    for (int i = 0; i < threadCount/2; i++) {
+      Thread t = new Thread() {
+        public void run() {
+          Long tid = Thread.currentThread().getId();
+          String strtid = tid.toString();
+          Random rng = new Random(tid);
+
+          try {
+            startGate.await();
+          } catch (InterruptedException e1) {
+            Thread.currentThread().interrupt();
+          }
+          for(int j = 0; j < 10; j++) {
+            int events = rng.nextInt(5) + 1;
+            Transaction tx = channel.getTransaction();
+            tx.begin();
+            for(int k = 0; k < events; k++) {
+              channel.put(EventBuilder.withBody(strtid.getBytes()));
+            }
+            if(rng.nextBoolean()) {
+              try {
+                tx.commit();
+                AtomicInteger tcount = committedPuts.get(strtid);
+                if(tcount == null) {
+                  committedPuts.put(strtid, new AtomicInteger(events));
+                } else {
+                  tcount.addAndGet(events);
+                }
+              } catch(ChannelException e) {
+                System.out.print("puts commit failed");
+                tx.rollback();
+              }
+            } else {
+              tx.rollback();
+            }
+            tx.close();
+          }
+          endGate.countDown();
+        }
+      };
+      // start source
+      t.start();
+      final Integer takeMapLock = 0;
+      t = new Thread() {
+        public void run() {
+          Random rng = new Random(Thread.currentThread().getId());
+
+          try {
+            startGate.await();
+          } catch (InterruptedException e1) {
+            Thread.currentThread().interrupt();
+          }
+          for(int j = 0; j < 10; j++) {
+            int events = rng.nextInt(5) + 1;
+            Transaction tx = channel.getTransaction();
+            tx.begin();
+            Event[] taken = new Event[events];
+            int k;
+            for(k = 0; k < events; k++) {
+              taken[k] = channel.take();
+              if(taken[k] == null) break;
+            }
+            if(rng.nextBoolean()) {
+              try {
+                tx.commit();
+                for(Event e : taken) {
+                  if(e == null) break;
+                  String index = new String(e.getBody());
+                  synchronized(takeMapLock) {
+                    AtomicInteger remain = committedTakes.get(index);
+                    if(remain == null) {
+                      committedTakes.put(index, new AtomicInteger(1));
+                    } else {
+                      remain.incrementAndGet();
+                    }
+                  }
+                }
+              } catch (ChannelException e) {
+                System.out.print("takes commit failed");
+                tx.rollback();
+              }
+            } else {
+              tx.rollback();
+            }
+            tx.close();
+          }
+          endGate.countDown();
+        }
+      };
+      // start sink
+      t.start();
+    }
+    startGate.countDown();
+    if(!endGate.await(20, TimeUnit.SECONDS)) {
+      Assert.fail("Not all threads ended succesfully");
+    }
+
+    // verify the counts
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    Event e;
+    // first pull out what's left in the channel and remove it from the
+    // committed map
+    while((e = channel.take()) != null) {
+      String index = new String(e.getBody());
+      AtomicInteger remain = committedPuts.get(index);
+      int post = remain.decrementAndGet();
+      if(post == 0) {
+        committedPuts.remove(index);
+      }
+    }
+    tx.commit();
+    tx.close();
+
+    // now just check the committed puts match the committed takes
+    for(Entry<String, AtomicInteger> takes : committedTakes.entrySet()) {
+      AtomicInteger count = committedPuts.get(takes.getKey());
+      if(count == null)
+        Assert.fail("Putted data doesn't exist");
+      if(count.get() != takes.getValue().get())
+        Assert.fail(String.format("Mismatched put and take counts expected %d had %d", count.get(), takes.getValue().get()));
+      committedPuts.remove(takes.getKey());
+    }
+    if(!committedPuts.isEmpty()) Assert.fail("Puts still has entries remaining");
+  }
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java Fri Feb 24 05:17:00 2012
@@ -25,11 +25,12 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.Transaction.TransactionState;
-import org.apache.flume.channel.MemoryChannel.MemTransaction;
+import org.apache.flume.channel.MemoryChannel.MemoryTransaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestMemoryChannelTransaction {
@@ -49,6 +50,8 @@ public class TestMemoryChannelTransactio
     int putCounter = 0;
 
     context.put("keep-alive", "1");
+    context.put("capacity", "100");
+    context.put("transactionCapacity", "50");
     Configurables.configure(channel, context);
 
     Transaction transaction = channel.getTransaction();
@@ -152,6 +155,7 @@ public class TestMemoryChannelTransactio
     transaction.close();
   }
 
+  @Ignore("BasicChannelSemantics doesn't support re-entrant transactions")
   @Test
   public void testReEntTxn() throws InterruptedException,
       EventDeliveryException {
@@ -172,12 +176,8 @@ public class TestMemoryChannelTransactio
       event = EventBuilder.withBody(("test event" + putCounter).getBytes());
       channel.put(event);
       transaction.commit(); // inner commit
-      Assert.assertEquals(((MemTransaction) transaction).getState(),
-          TransactionState.Started);
     }
     transaction.commit();
-    Assert.assertEquals(((MemTransaction) transaction).getState(),
-        TransactionState.Committed);
     transaction.close();
 
     transaction = channel.getTransaction();
@@ -197,6 +197,7 @@ public class TestMemoryChannelTransactio
     transaction.close();
   }
 
+  @Ignore("BasicChannelSemantics doesn't support re-entrant transactions")
   @Test
   public void testReEntTxnRollBack() throws InterruptedException,
       EventDeliveryException {
@@ -248,15 +249,11 @@ public class TestMemoryChannelTransactio
       Assert.assertNotNull("lost an event", event2);
       Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
       transaction.commit(); // inner commit
-      Assert.assertEquals(((MemTransaction) transaction).getState(),
-          TransactionState.Started);
     }
     event2 = channel.take();
     Assert.assertNull("extra event found", event2);
 
     transaction.rollback();
-    Assert.assertEquals(((MemTransaction) transaction).getState(),
-        TransactionState.RolledBack);
     transaction.close();
 
     // verify that the events were left in there due to rollback

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java Fri Feb 24 05:17:00 2012
@@ -58,6 +58,9 @@ public class TestExecSource {
     Context context = new Context();
 
     context.put("command", "cat /etc/passwd");
+    context.put("keep-alive", "1");
+    context.put("capacity", "1000");
+    context.put("transactionCapacity", "1000");
     Configurables.configure(source, context);
     Configurables.configure(channel, context);
 

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Fri Feb 24 05:17:00 2012
@@ -57,7 +57,6 @@ public class TestNetcatSource {
     source = new NetcatSource();
 
     Context context = new Context();
-    context.put("capacity", "50");
 
     Configurables.configure(channel, context);
     List<Channel> channels = new ArrayList<Channel>();