You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/18 00:35:52 UTC

svn commit: r1185412 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/channel/ flume-ng-core/src/test/java/org/apache/flume/channel/ flume-ng-core/src/test/java/org/apache/flume/sink/ flume-ng-core/src/test/java/or...

Author: arvind
Date: Mon Oct 17 22:35:52 2011
New Revision: 1185412

URL: http://svn.apache.org/viewvc?rev=1185412&view=rev
Log:
FLUME-722. Transactinal memory channel implementation.

(Prasad Mujumdar via Arvind Prabhakar)

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
Removed:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Mon Oct 17 22:35:52 2011
@@ -17,150 +17,335 @@
  */
 package org.apache.flume.channel;
 
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.LinkedList;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
+//import org.apache.flume.channel.MemoryChannel.MemTransaction;
 import org.apache.flume.conf.Configurable;
 
 import com.google.common.base.Preconditions;
 
 /**
- * <p>
- * A capacity-capped {@link Channel} implementation that supports in-memory
- * buffering and delivery of events.
- * </p>
- * <p>
- * This channel is appropriate for
- * <q>best effort</q> delivery of events where high throughput is favored over
- * data durability. To be clear, <b>this channel offers absolutely no guarantee
- * of event delivery</b> in the face of (any) component failure.
- * </p>
- * <p>
- * TODO: Discuss guarantees, corner cases re: potential data loss (e.g. consumer
- * begins a tx, takes events, and gets SIGKILL before rollback).
- * </p>
- * <p>
- * <b>Configuration options</b>
- * </p>
- * <table>
- * <tr>
- * <th>Parameter</th>
- * <th>Description</th>
- * <th>Unit / Type</th>
- * <th>Default</th>
- * </tr>
- * <tr>
- * <td><tt>capacity</tt></td>
- * <td>The in-memory capacity of this channel. Store up to <tt>capacity</tt>
- * events before refusing new events.</td>
- * <td>events / int</td>
- * <td>50</td>
- * </tr>
- * <tr>
- * <td><tt>keep-alive</tt></td>
- * <td>The amount of time (seconds) to wait for an event before returning
- * <tt>null</tt> on {@link #take()}.</td>
- * <td>seconds / int</td>
- * <td>3</td>
- * </tr>
- * </table>
- * <p>
- * <b>Metrics</b>
- * </p>
- * <p>
- * TODO
- * </p>
+ * Memory channel that with full transaction support
+ * Uses transaction object for each thread (source and sink) attached to channel.
+ * The events are stored in the thread safe Dequeue.  * The put and take are directly 
+ * executed in the common queue. Channel has a marker for the last committed event in
+ * order to avoid sink reading uncommitted data.
+ * The transactions keep track of the actions to perform undo when rolled back.
+ *
  */
 public class MemoryChannel implements Channel, Configurable {
 
   private static final Integer defaultCapacity = 50;
   private static final Integer defaultKeepAlive = 3;
 
-  private BlockingQueue<Event> queue;
+  // wrap the event with a counter
+  private class StampedEvent {
+    private int timeStamp;
+    private Event event;
+
+    public StampedEvent (int stamp, Event E) {
+      timeStamp = stamp;
+      event = E;
+    }
+
+    public int getStamp() {
+      return timeStamp;
+    }
+
+    public Event getEvent() {
+      return event;
+    }
+
+  }
+
+  /* transaction class
+   * maintain a 'undo' list for put/take from the queue. 
+   * The rollback performs undo of the operations using these lists.
+   * Also maintain a stamp/counter for commit and last take.
+   * This is used to ensure that a transaction doesn't read
+   * uncommitted events. 
+   */
+  public class MemTransaction implements Transaction {
+    private int putStamp;
+    private int takeStamp;
+    private LinkedList<StampedEvent> undoTakeList;
+    private LinkedList<StampedEvent> undoPutList;
+    private TransactionState txnState;
+
+    public MemTransaction () {
+      txnState = TransactionState.Closed;
+    }
+
+    @Override
+    /** 
+     * Start the transaction 
+     *  initialize the undo lists, stamps
+     *  set transaction state to Started
+     */
+    public void begin() {
+      undoTakeList = new LinkedList<StampedEvent>();
+      undoPutList = new LinkedList<StampedEvent> ();
+      putStamp = 0;
+      takeStamp = 0;
+      txnState = TransactionState.Started;
+    }
+
+    @Override
+    /**
+     * Commit the transaction
+     *  If there was an event added by this transaction, then set the 
+     *  commit stamp set transaction state to Committed
+     */
+    public void commit() {
+      Preconditions.checkArgument(txnState == TransactionState.Started, 
+          "transaction not started");
+      // if the txn put any events, then update the channel's stamp and
+      // signal for availability of committed data in the queue
+      if (putStamp != 0) {
+         lastCommitStamp.set(putStamp);
+         lock.lock();
+         hasData.signal();
+         lock.unlock();
+      }
+      txnState = TransactionState.Committed;
+      undoPutList.clear();
+      undoTakeList.clear();
+    }
+
+    @Override
+    /**
+     * Rollback the transaction
+     *  execute the channel's undoXXX to undo the actions done by this txn
+     *  set transaction state to rolled back
+     */
+    public void rollback() {
+      Preconditions.checkArgument(txnState == TransactionState.Started, 
+          "transaction not started");
+      undoPut(this);
+      undoTake(this);
+      txnState = TransactionState.RolledBack;
+    }
+
+    @Override
+    /** 
+     * Close the transaction
+     *  if the transaction is still open, then roll it back
+     *  set transaction state to Closed
+     */
+    public void close() {
+      if (txnState == TransactionState.Started) {
+        rollback();
+      }
+      txnState = TransactionState.Closed;
+      forgetTransaction(this);
+    }
+
+    protected int lastTakeStamp() {
+      return takeStamp;
+    }
+    
+    protected void logPut(StampedEvent e, int stamp) {
+      undoPutList.addLast(e);
+      putStamp = stamp;
+    }
+
+    protected void logTake(StampedEvent e, int stamp) {
+      undoTakeList.addLast(e);
+      takeStamp = stamp;
+    }
+    
+    protected StampedEvent removePut() {
+      if (undoPutList.isEmpty()) {
+        return null;
+      } else {
+        return undoPutList.removeLast();
+      }
+    }
+
+    protected StampedEvent removeTake() {
+      if (undoTakeList.isEmpty()) {
+        return null;
+      } else {
+        return undoTakeList.removeLast();
+      }
+    }
+
+  }
+
+  // The main event queue
+  private LinkedBlockingDeque<StampedEvent> queue;
+  
+  private AtomicInteger currentStamp; // operation counter
+  private AtomicInteger lastCommitStamp; // counter for the last commit
+  private ConcurrentHashMap<Long, MemTransaction> txnMap; // open transactions
   private Integer keepAlive;
+  final Lock lock = new ReentrantLock();
+  final Condition hasData  = lock.newCondition(); 
 
+  /** 
+   * Channel constructor
+   */
+  public MemoryChannel() {
+    currentStamp = new AtomicInteger(1);
+    lastCommitStamp = new AtomicInteger(0);
+    txnMap = new ConcurrentHashMap<Long, MemTransaction>();
+  }
+
+  /** 
+   * set the event queue capacity
+   */
   @Override
   public void configure(Context context) {
-    Integer capacity = context.get("capacity", Integer.class);
-    keepAlive = context.get("keep-alive", Integer.class);
 
+    Integer capacity = context.get("capacity", Integer.class);
     if (capacity == null) {
       capacity = defaultCapacity;
     }
 
+    keepAlive = context.get("keep-alive", Integer.class);
     if (keepAlive == null) {
       keepAlive = defaultKeepAlive;
     }
 
-    queue = new ArrayBlockingQueue<Event>(capacity);
+    queue = new LinkedBlockingDeque<StampedEvent>(capacity);
   }
-
+  
   @Override
+  /** 
+   * Add the given event to the end of the queue
+   * save the event in the undoPut queue for possible rollback
+   * save the stamp of this put for commit
+   */
   public void put(Event event) {
     Preconditions.checkState(queue != null,
         "No queue defined (Did you forget to configure me?");
 
     try {
-      queue.put(event);
+      MemTransaction myTxn = findTransaction();
+      Preconditions.checkState(myTxn != null, "Transaction not started");
+
+      int myStamp = currentStamp.getAndIncrement();
+      StampedEvent stampedEvent = new StampedEvent(myStamp, event);
+      queue.put(stampedEvent);
+      myTxn.logPut(stampedEvent, myStamp);
+      
     } catch (InterruptedException ex) {
       throw new ChannelException("Failed to put(" + event + ")", ex);
     }
   }
 
+  /**
+   * undo of put
+   *  for all the events in the undoPut queue, remove those from the event queue
+   * @param myTxn
+   */
+  protected void undoPut(MemTransaction myTxn ) {
+    StampedEvent undoEvent;
+    StampedEvent currentEvent;
+
+    while ((undoEvent = myTxn.removePut()) != null) {
+      currentEvent = queue.removeLast();
+      Preconditions.checkNotNull(currentEvent, "Rollback error");
+      Preconditions.checkArgument(currentEvent == undoEvent ,
+          "Rollback error");
+    }
+  }
+  
   @Override
+  /**
+   * remove the event from the top of the queue and return it
+   * also add that event to undoTake queue for possible rollback
+   */
   public Event take() {
-    Preconditions.checkState(queue != null,
-        "No queue defined (Did you forget to configure me?");
+    Preconditions.checkState(queue != null, "Queue not configured");
 
     try {
-      return queue.poll(keepAlive, TimeUnit.SECONDS);
+      MemTransaction myTxn = findTransaction();
+      Preconditions.checkState(myTxn != null, "Transaction not started");     
+      Event event = null;
+      int timeout = keepAlive;
+
+      // wait for some committed data be there in the queue
+      if ((timeout > 0) && (myTxn.lastTakeStamp() == lastCommitStamp.get())) {
+        lock.lock();
+        hasData.await(timeout, TimeUnit.SECONDS);
+        lock.unlock();
+        timeout = 0; // don't wait any further
+      }
+
+      // don't go past the last committed element
+      if (myTxn.lastTakeStamp() != lastCommitStamp.get()) {
+        StampedEvent e = queue.poll(timeout, TimeUnit.SECONDS);
+        if (e != null) {
+           myTxn.logTake(e, e.getStamp());
+           event = e.getEvent();           
+         }
+      }
+      return event;
     } catch (InterruptedException ex) {
       throw new ChannelException("Failed to take()", ex);
     }
   }
 
-  @Override
-  public Transaction getTransaction() {
-    return NoOpTransaction.sharedInstance();
-  }
-
   /**
-   * <p>
-   * A no-op transaction implementation that does nothing at all.
-   * </p>
+   * undo of take operation
+   * for each event in the undoTake list, add it back to the event queue
+   * @param myTxn
    */
-  public static class NoOpTransaction implements Transaction {
-
-    private static NoOpTransaction sharedInstance;
-
-    public static Transaction sharedInstance() {
-      if (sharedInstance == null) {
-        sharedInstance = new NoOpTransaction();
-      }
+  protected void undoTake(MemTransaction myTxn) {
+    StampedEvent e;
 
-      return sharedInstance;
-    }
-
-    @Override
-    public void begin() {
-    }
-
-    @Override
-    public void commit() {
+    while ((e = myTxn.removeTake()) != null) {
+      queue.addFirst(e);
     }
+  }
+  
+  @Override
+  /**
+   * Return the channel's transaction
+   */
+  public Transaction getTransaction() {
+    MemTransaction txn;
 
-    @Override
-    public void rollback() {
+    // check if there's already a transaction created for this thread
+    txn = findTransaction();
+    
+    // Create a new transaction
+    if (txn == null) {
+      txn = new MemTransaction();
+      txnMap.put(Thread.currentThread().getId(), txn);
     }
+    return txn;
+  }
 
-    @Override
-    public void close() {
+  /**
+   * Remove the given transaction from the list of open transactions
+   * @param myTxn
+   */
+  protected void forgetTransaction(MemTransaction myTxn) {
+    MemTransaction currTxn = findTransaction();
+    Preconditions.checkArgument(myTxn == currTxn, "Wrong transaction to close");
+    txnMap.remove(Thread.currentThread().getId());
+  }
+  
+  // lookup the transaction for the current thread
+  protected MemTransaction findTransaction() {
+    try {
+      return txnMap.get(Thread.currentThread().getId());
+    } catch (NullPointerException eN) {
+      return null;
     }
   }
 
@@ -176,3 +361,4 @@ public class MemoryChannel implements Ch
     return null;
   }
 }
+

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java?rev=1185412&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java Mon Oct 17 22:35:52 2011
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * A capacity-capped {@link Channel} implementation that supports in-memory
+ * buffering and delivery of events.
+ * </p>
+ * <p>
+ * This channel is appropriate for
+ * <q>best effort</q> delivery of events where high throughput is favored over
+ * data durability. To be clear, <b>this channel offers absolutely no guarantee
+ * of event delivery</b> in the face of (any) component failure.
+ * </p>
+ * <p>
+ * TODO: Discuss guarantees, corner cases re: potential data loss (e.g. consumer
+ * begins a tx, takes events, and gets SIGKILL before rollback).
+ * </p>
+ * <p>
+ * <b>Configuration options</b>
+ * </p>
+ * <table>
+ * <tr>
+ * <th>Parameter</th>
+ * <th>Description</th>
+ * <th>Unit / Type</th>
+ * <th>Default</th>
+ * </tr>
+ * <tr>
+ * <td><tt>capacity</tt></td>
+ * <td>The in-memory capacity of this channel. Store up to <tt>capacity</tt>
+ * events before refusing new events.</td>
+ * <td>events / int</td>
+ * <td>50</td>
+ * </tr>
+ * <tr>
+ * <td><tt>keep-alive</tt></td>
+ * <td>The amount of time (seconds) to wait for an event before returning
+ * <tt>null</tt> on {@link #take()}.</td>
+ * <td>seconds / int</td>
+ * <td>3</td>
+ * </tr>
+ * </table>
+ * <p>
+ * <b>Metrics</b>
+ * </p>
+ * <p>
+ * TODO
+ * </p>
+ */
+public class PseudoTxnMemoryChannel implements Channel, Configurable {
+
+  private static final Integer defaultCapacity = 50;
+  private static final Integer defaultKeepAlive = 3;
+
+  private BlockingQueue<Event> queue;
+  private Integer keepAlive;
+
+  @Override
+  public void configure(Context context) {
+    Integer capacity = context.get("capacity", Integer.class);
+    keepAlive = context.get("keep-alive", Integer.class);
+
+    if (capacity == null) {
+      capacity = defaultCapacity;
+    }
+
+    if (keepAlive == null) {
+      keepAlive = defaultKeepAlive;
+    }
+
+    queue = new ArrayBlockingQueue<Event>(capacity);
+  }
+
+  @Override
+  public void put(Event event) {
+    Preconditions.checkState(queue != null,
+        "No queue defined (Did you forget to configure me?");
+
+    try {
+      queue.put(event);
+    } catch (InterruptedException ex) {
+      throw new ChannelException("Failed to put(" + event + ")", ex);
+    }
+  }
+
+  @Override
+  public Event take() {
+    Preconditions.checkState(queue != null,
+        "No queue defined (Did you forget to configure me?");
+
+    try {
+      return queue.poll(keepAlive, TimeUnit.SECONDS);
+    } catch (InterruptedException ex) {
+      throw new ChannelException("Failed to take()", ex);
+    }
+  }
+
+  @Override
+  public Transaction getTransaction() {
+    return NoOpTransaction.sharedInstance();
+  }
+
+  /**
+   * <p>
+   * A no-op transaction implementation that does nothing at all.
+   * </p>
+   */
+  public static class NoOpTransaction implements Transaction {
+
+    private static NoOpTransaction sharedInstance;
+
+    public static Transaction sharedInstance() {
+      if (sharedInstance == null) {
+        sharedInstance = new NoOpTransaction();
+      }
+
+      return sharedInstance;
+    }
+
+    @Override
+    public void begin() {
+    }
+
+    @Override
+    public void commit() {
+    }
+
+    @Override
+    public void rollback() {
+    }
+
+    @Override
+    public void close() {
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public String getName() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java Mon Oct 17 22:35:52 2011
@@ -16,7 +16,7 @@ public class TestMemoryChannelTransactio
 
   @Before
   public void setUp() {
-    channel = new MultiOpMemChannel();
+    channel = new MemoryChannel();
   }
 
   @Test

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Mon Oct 17 22:35:52 2011
@@ -13,6 +13,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.PollableSink;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
@@ -83,9 +84,14 @@ public class TestAvroSink {
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
         LifecycleState.START_OR_ERROR, 5000));
 
+    Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
     for (int i = 0; i < 10; i++) {
       channel.put(event);
     }
+    transaction.commit();
+    transaction.close();
 
     for (int i = 0; i < 5; i++) {
       PollableSink.Status status = sink.process();

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java Mon Oct 17 22:35:52 2011
@@ -4,7 +4,7 @@ import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.lifecycle.LifecycleException;
@@ -27,7 +27,7 @@ public class TestLoggerSink {
   public void testAppend() throws InterruptedException, LifecycleException,
       EventDeliveryException {
 
-    Channel channel = new MemoryChannel();
+    Channel channel = new PseudoTxnMemoryChannel();
     Context context = new Context();
     Configurables.configure(channel, context);
     Configurables.configure(sink, context);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java Mon Oct 17 22:35:52 2011
@@ -9,7 +9,7 @@ import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.lifecycle.LifecycleException;
@@ -65,7 +65,7 @@ public class TestRollingFileSink {
 
     Configurables.configure(sink, context);
 
-    Channel channel = new MemoryChannel();
+    Channel channel = new PseudoTxnMemoryChannel();
     Configurables.configure(channel, context);
 
     sink.setChannel(channel);

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Mon Oct 17 22:35:52 2011
@@ -10,6 +10,7 @@ import org.apache.avro.ipc.specific.Spec
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.lifecycle.LifecycleController;
@@ -120,11 +121,17 @@ public class TestAvroSource {
     Status status = client.append(avroEvent);
 
     Assert.assertEquals(Status.OK, status);
+ 
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
 
     Event event = channel.take();
     Assert.assertNotNull(event);
     Assert.assertEquals("Channel contained our event", "Hello avro",
         new String(event.getBody()));
+    transaction.commit();
+    transaction.close();
+
 
     logger.debug("Round trip event:{}", event);
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java Mon Oct 17 22:35:52 2011
@@ -5,7 +5,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.PollableSource;
-import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.junit.Assert;
@@ -25,7 +25,7 @@ public class TestSequenceGeneratorSource
   public void testProcess() throws InterruptedException, LifecycleException,
       EventDeliveryException {
 
-    Channel channel = new MemoryChannel();
+    Channel channel = new PseudoTxnMemoryChannel();
     Context context = new Context();
 
     context.put("logicalNode.name", "test");
@@ -48,7 +48,7 @@ public class TestSequenceGeneratorSource
   public void testLifecycle() throws InterruptedException,
       EventDeliveryException {
 
-    Channel channel = new MemoryChannel();
+    Channel channel = new PseudoTxnMemoryChannel();
     Context context = new Context();
 
     context.put("logicalNode.name", "test");

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Mon Oct 17 22:35:52 2011
@@ -14,6 +14,7 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.lifecycle.LifecycleException;
@@ -78,6 +79,9 @@ public class TestNetcatSource {
 
     };
 
+    Transaction tx = source.getChannel().getTransaction();
+    tx.begin();
+
     for (int i = 0; i < 100; i++) {
       executor.submit(clientRequestRunnable);
 
@@ -87,6 +91,8 @@ public class TestNetcatSource {
       Assert.assertArrayEquals("Test message".getBytes(), event.getBody());
     }
 
+    tx.commit();
+    tx.close();
     executor.shutdown();
 
     while (!executor.isTerminated()) {

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Mon Oct 17 22:35:52 2011
@@ -30,7 +30,6 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.MultiOpMemChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
 import org.apache.flume.lifecycle.LifecycleException;
@@ -146,12 +145,11 @@ public class TestHDFSEventSink extends T
     sink.setChannel(channel);
     sink.start();
 
-    Transaction txn = channel.getTransaction();
-
     Calendar eventDate = Calendar.getInstance();
 
     // push the event batches into channel
     for (i = 1; i < 4; i++) {
+      Transaction txn = channel.getTransaction();
       txn.begin();
       for (j = 1; j <= txnMax; j++) {
         Event event = new SimpleEvent();
@@ -166,6 +164,7 @@ public class TestHDFSEventSink extends T
         totalEvents++;
       }
       txn.commit();
+      txn.close();
 
       // execute sink to process the events
       sink.process();
@@ -235,7 +234,7 @@ public class TestHDFSEventSink extends T
 
     Configurables.configure(sink, context);
 
-    Channel channel = new MultiOpMemChannel();
+    Channel channel = new MemoryChannel();
     Configurables.configure(channel, context);
 
     sink.setChannel(channel);
@@ -335,12 +334,11 @@ public class TestHDFSEventSink extends T
     sink.setChannel(channel);
     sink.start();
 
-    Transaction txn = channel.getTransaction();
-
     Calendar eventDate = Calendar.getInstance();
 
     // push the event batches into channel
     for (int i = 1; i < 4; i++) {
+      Transaction txn = channel.getTransaction();
       txn.begin();
       for (int j = 1; j <= txnMax; j++) {
         Event event = new SimpleEvent();
@@ -354,6 +352,7 @@ public class TestHDFSEventSink extends T
         channel.put(event);
       }
       txn.commit();
+      txn.close();
 
       // execute sink to process the events
       sink.process();
@@ -414,7 +413,7 @@ public class TestHDFSEventSink extends T
 
     Configurables.configure(sink, context);
 
-    Channel channel = new MultiOpMemChannel();
+    Channel channel = new MemoryChannel();
     Configurables.configure(channel, context);
 
     sink.setChannel(channel);