You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/02/13 22:32:42 UTC

svn commit: r1243703 - in /incubator/flume/branches/flume-728/flume-ng-core/src: main/java/org/apache/flume/ main/java/org/apache/flume/channel/ test/java/org/apache/flume/channel/

Author: arvind
Date: Mon Feb 13 21:32:42 2012
New Revision: 1243703

URL: http://svn.apache.org/viewvc?rev=1243703&view=rev
Log:
FLUME-935. Create abstract implementations  of basic channel/transaction semantics.

(Peter Newcomb via Arvind Prabhakar)

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java   (with props)
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java   (with props)
Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java?rev=1243703&r1=1243702&r2=1243703&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java Mon Feb 13 21:32:42 2012
@@ -34,7 +34,7 @@ public class ChannelException extends Ru
   /**
    * @param ex the causal exception
    */
-  public ChannelException(Exception ex) {
+  public ChannelException(Throwable ex) {
     super(ex);
   }
 
@@ -42,7 +42,7 @@ public class ChannelException extends Ru
    * @param message the exception message
    * @param ex the causal exception
    */
-  public ChannelException(String message, Exception ex) {
+  public ChannelException(String message, Throwable ex) {
     super(message, ex);
   }
 

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * An implementation of basic {@link Channel} semantics, including the
+ * implied thread-local semantics of the {@link Transaction} class,
+ * which is required to extend {@link BasicTransactionSemantics}.
+ * </p>
+ */
+public abstract class BasicChannelSemantics extends AbstractChannel {
+
+  private ThreadLocal<BasicTransactionSemantics> currentTransaction
+      = new ThreadLocal<BasicTransactionSemantics>();
+
+  private boolean initialized = false;
+
+  /**
+   * <p>
+   * Called upon first getTransaction() request, while synchronized on
+   * this {@link Channel} instance.  Use this method to delay the
+   * initializization resources until just before the first
+   * transaction begins.
+   * </p>
+   */
+  protected void initialize() {}
+
+  /**
+   * <p>
+   * Called to create new {@link Transaction} objects, which must
+   * extend {@link BasicTransactionSemantics}.  Each object is used
+   * for only one transaction, but is stored in a thread-local and
+   * retrieved by <code>getTransaction</code> for the duration of that
+   * transaction.
+   * </p>
+   */
+  protected abstract BasicTransactionSemantics createTransaction();
+
+  /**
+   * <p>
+   * Ensures that a transaction exists for this thread and then
+   * delegates the <code>put</code> to the thread's {@link
+   * BasicTransactionSemantics} instance.
+   * </p>
+   */
+  @Override
+  public void put(Event event) throws ChannelException {
+    BasicTransactionSemantics transaction = currentTransaction.get();
+    Preconditions.checkState(transaction != null,
+        "No transaction exists for this thread");
+    transaction.put(event);
+  }
+
+  /**
+   * <p>
+   * Ensures that a transaction exists for this thread and then
+   * delegates the <code>take</code> to the thread's {@link
+   * BasicTransactionSemantics} instance.
+   * </p>
+   */
+  @Override
+  public Event take() throws ChannelException {
+    BasicTransactionSemantics transaction = currentTransaction.get();
+    Preconditions.checkState(transaction != null,
+        "No transaction exists for this thread");
+    return transaction.take();
+  }
+
+  /**
+   * <p>
+   * Initializes the channel if it is not already, then checks to see
+   * if there is an open transaction for this thread, creating a new
+   * one via <code>createTransaction</code> if not.
+   * @return the current <code>Transaction</code> object for the
+   *     calling thread
+   * </p>
+   */
+  @Override
+  public Transaction getTransaction() {
+
+    if (!initialized) {
+      synchronized (this) {
+        if (!initialized) {
+          initialize();
+          initialized = true;
+        }
+      }
+    }
+
+    BasicTransactionSemantics transaction = currentTransaction.get();
+    if (transaction == null || transaction.getState().equals(
+            BasicTransactionSemantics.State.CLOSED)) {
+      transaction = createTransaction();
+      currentTransaction.set(transaction);
+    }
+    return transaction;
+  }
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * An implementation of basic {@link Transaction} semantics designed
+ * to work in concert with {@link BasicChannelSemantics} to simplify
+ * creation of robust {@link Channel} implementations.  This class
+ * ensures that each transaction implementation method is called only
+ * while the transaction is in the correct state for that method, and
+ * only by the thread that created the transaction.  Nested calls to
+ * <code>begin()</code> and <code>close()</code> are supported as long
+ * as they are balanced.
+ * </p>
+ * <p>
+ * Subclasses need only implement <code>doPut</code>,
+ * <code>doTake</code>, <code>doCommit</code>, and
+ * <code>doRollback</code>, and the developer can rest assured that
+ * those methods are called only after transaction state preconditions
+ * have been properly met.  <code>doBegin</code> and
+ * <code>doClose</code> may also be implemented if there is work to be
+ * done at those points.
+ * </p>
+ * <p>
+ * All InterruptedException exceptions thrown from the implementations
+ * of the <code>doXXX</code> methods are automatically wrapped to
+ * become ChannelExceptions, but only after restoring the interrupted
+ * status of the thread so that any subsequent blocking method calls
+ * will themselves throw InterruptedException rather than blocking.
+ * The exception to this rule is <code>doTake</code>, which simply
+ * returns null instead of wrapping and propagating the
+ * InterruptedException, though it still first restores the
+ * interrupted status of the thread.
+ * </p>
+ */
+public abstract class BasicTransactionSemantics implements Transaction {
+
+  private State state;
+  private long initialThreadId;
+
+  protected void doBegin() throws InterruptedException {}
+  protected abstract void doPut(Event event) throws InterruptedException;
+  protected abstract Event doTake() throws InterruptedException;
+  protected abstract void doCommit() throws InterruptedException;
+  protected abstract void doRollback() throws InterruptedException;
+  protected void doClose() {}
+
+  protected BasicTransactionSemantics() {
+    state = State.NEW;
+    initialThreadId = Thread.currentThread().getId();
+  }
+
+  /**
+   * <p>
+   * The method to which {@link BasicChannelSemantics} delegates calls
+   * to <code>put</code>.
+   * </p>
+   */
+  protected void put(Event event) {
+    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+        "put() called from different thread than getTransaction()!");
+    Preconditions.checkState(state.equals(State.OPEN),
+        "put() called when transaction is %s!", state);
+    Preconditions.checkArgument(event != null,
+        "put() called with null event!");
+
+    try {
+      doPut(event);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ChannelException(e.toString(), e);
+    }
+  }
+
+  /**
+   * <p>
+   * The method to which {@link BasicChannelSemantics} delegates calls
+   * to <code>take</code>.
+   * </p>
+   */
+  protected Event take() {
+    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+        "take() called from different thread than getTransaction()!");
+    Preconditions.checkState(state.equals(State.OPEN),
+        "take() called when transaction is %s!", state);
+
+    try {
+      return doTake();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return null;
+    }
+  }
+
+  /**
+   * @return the current state of the transaction
+   */
+  protected State getState() {
+    return state;
+  }
+
+  @Override
+  public void begin() {
+    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+        "begin() called from different thread than getTransaction()!");
+    Preconditions.checkState(state.equals(State.NEW),
+        "begin() called when transaction is " + state + "!");
+
+    try {
+      doBegin();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ChannelException(e.toString(), e);
+    }
+    state = State.OPEN;
+  }
+
+  @Override
+  public void commit() {
+    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+        "commit() called from different thread than getTransaction()!");
+    Preconditions.checkState(state.equals(State.OPEN),
+        "commit() called when transaction is %s!", state);
+
+    try {
+      doCommit();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ChannelException(e.toString(), e);
+    }
+    state = State.COMPLETED;
+  }
+
+  @Override
+  public void rollback() {
+    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+        "rollback() called from different thread than getTransaction()!");
+    Preconditions.checkState(state.equals(State.OPEN),
+        "rollback() called when transaction is %s!", state);
+
+    state = State.COMPLETED;
+    try {
+      doRollback();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ChannelException(e.toString(), e);
+    }
+  }
+
+  @Override
+  public void close() {
+    Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+        "close() called from different thread than getTransaction()!");
+    Preconditions.checkState(
+            state.equals(State.NEW) || state.equals(State.COMPLETED),
+            "close() called when transaction is %s"
+            + " - you must either commit or rollback first", state);
+
+    state = State.CLOSED;
+    doClose();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("BasicTransactionSemantics: {");
+    builder.append(" state:").append(state);
+    builder.append(" initialThreadId:").append(initialThreadId);
+    builder.append(" }");
+    return builder.toString();
+  }
+
+  /**
+   * <p>
+   * The state of the {@link Transaction} to which it belongs.
+   * </p>
+   * <dl>
+   * <dt>NEW</dt>
+   * <dd>A newly created transaction that has not yet begun.</dd>
+   * <dt>OPEN</dt>
+   * <dd>A transaction that is open. It is permissible to commit or rollback.
+   * </dd>
+   * <dt>COMPLETED</dt>
+   * <dd>This transaction has been committed or rolled back. It is illegal to
+   * perform any further operations beyond closing it.</dd>
+   * <dt>CLOSED</dt>
+   * <dd>A closed transaction. No further operations are permitted.</dd>
+   */
+  protected static enum State {
+    NEW, OPEN, COMPLETED, CLOSED
+  }
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * A collection of utilities for interacting with {@link Channel}
+ * objects.  Use of these utilities prevents error-prone replication
+ * of required transaction usage semantics, and allows for more
+ * concise code.
+ * </p>
+ * <p>
+ * However, as a side-effect of its generality, and in particular of
+ * its use of {@link Callable}, any checked exceptions thrown by
+ * user-created transactors will be silently wrapped with {@link
+ * ChannelException} before being propagated.  Only direct use of
+ * {@link #transact(Channel,Callable)} suffers from this issue, even
+ * though all other methods are based upon it, because none of the
+ * other methods are capable of producing or allowing checked
+ * exceptions in the first place.
+ * </p>
+ */
+public class ChannelUtils {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(ChannelUtils.class);
+
+  /**
+   * <p>
+   * A convenience method for single-event <code>put</code> transactions.
+   * </p>
+   * @see #transact(Channel,Callable)
+   */
+  public static void put(final Channel channel, final Event event)
+      throws ChannelException {
+    transact(channel, new Runnable() {
+        @Override
+        public void run() {
+          channel.put(event);
+        }
+      });
+  }
+
+  /**
+   * <p>
+   * A convenience method for multiple-event <code>put</code> transactions.
+   * </p>
+   * @see #transact(Channel,Callable)
+   */
+  public static void put(final Channel channel, final Collection<Event> events)
+      throws ChannelException {
+    transact(channel, new Runnable() {
+        @Override
+        public void run() {
+          for (Event event : events) {
+            channel.put(event);
+          }
+        }
+      });
+  }
+
+  /**
+   * <p>
+   * A convenience method for single-event <code>take</code> transactions.
+   * </p>
+   * @return a single event, or null if the channel has none available
+   * @see #transact(Channel,Callable)
+   */
+  public static Event take(final Channel channel)
+      throws ChannelException {
+    return transact(channel, new Callable<Event>() {
+        @Override
+        public Event call() {
+          return channel.take();
+        }
+      });
+  }
+
+  /**
+   * <p>
+   * A convenience method for multiple-event <code>take</code> transactions.
+   * </p>
+   * @return a list of at most <code>max</code> events
+   * @see #transact(Channel,Callable)
+   */
+  public static List<Event> take(final Channel channel, final int max)
+      throws ChannelException {
+    return transact(channel, new Callable<List<Event>>() {
+        @Override
+        public List<Event> call() {
+          List<Event> events = new ArrayList<Event>(max);
+          while (events.size() < max) {
+            Event event = channel.take();
+            if (event == null) {
+              break;
+            }
+            events.add(event);
+          }
+          return events;
+        }
+      });
+  }
+
+  /**
+   * <p>
+   * A convenience method for transactions that don't require a return
+   * value.  Simply wraps the <code>transactor</code> using {@link
+   * Executors#callable} and passes that to {@link
+   * #transact(Channel,Callable)}.
+   * </p>
+   * @see #transact(Channel,Callable)
+   * @see Executors#callable(Runnable)
+   */
+  public static void transact(Channel channel, Runnable transactor)
+      throws ChannelException {
+    transact(channel, Executors.callable(transactor));
+  }
+
+  /**
+   * <p>
+   * A general optimistic implementation of {@link Transaction} client
+   * semantics.  It gets a new transaction object from the
+   * <code>channel</code>, calls <code>begin()</code> on it, and then
+   * invokes the supplied <code>transactor</code> object.  If an
+   * exception is thrown, then the transaction is rolled back;
+   * otherwise the transaction is committed and the value returned by
+   * the <code>transactor</code> is returned.  In either case, the
+   * transaction is closed before the function exits.  All secondary
+   * exceptions (i.e. those thrown by
+   * <code>Transaction.rollback()</code> or
+   * <code>Transaction.close()</code> while recovering from an earlier
+   * exception) are logged, allowing the original exception to be
+   * propagated instead.
+   * </p>
+   * <p>
+   * This implementation is optimistic in that it expects transaction
+   * rollback to be infrequent: it will rollback a transaction only
+   * when the supplied <code>transactor</code> throws an exception,
+   * and exceptions are a fairly heavyweight mechanism for handling
+   * frequently-occurring events.
+   * </p>
+   * @return the value returned by <code>transactor.call()</code>
+   */
+  public static <T> T transact(Channel channel, Callable<T> transactor)
+      throws ChannelException {
+    Transaction transaction = channel.getTransaction();
+    boolean committed = false;
+    boolean interrupted = false;
+    try {
+      transaction.begin();
+      T value = transactor.call();
+      transaction.commit();
+      committed = true;
+      return value;
+    } catch (Throwable e) {
+      interrupted = Thread.currentThread().isInterrupted();
+      try {
+        transaction.rollback();
+      } catch (Throwable e2) {
+        logger.error("Failed to roll back transaction, exception follows:", e2);
+      }
+      if (e instanceof InterruptedException) {
+        interrupted = true;
+      } else if (e instanceof Error) {
+        throw (Error) e;
+      } else if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
+      }
+      throw new ChannelException(e);
+    } finally {
+      interrupted = interrupted || Thread.currentThread().isInterrupted();
+      try {
+        transaction.close();
+      } catch (Throwable e) {
+        if (committed) {
+          if (e instanceof Error) {
+            throw (Error) e;
+          } else if (e instanceof RuntimeException) {
+            throw (RuntimeException) e;
+          } else {
+            throw new ChannelException(e);
+          }
+        } else {
+          logger.error(
+              "Failed to close transaction after error, exception follows:", e);
+        }
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  /** Disallows instantiation */
+  private ChannelUtils() {}
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractBasicChannelSemanticsTest {
+
+  protected static List<Event> events;
+  static {
+    Event[] array = new Event[7];
+    for (int i = 0; i < array.length; ++i) {
+      array[i] = EventBuilder.withBody(("test event " + i).getBytes());
+    }
+    events = Collections.unmodifiableList(Arrays.asList(array));
+  }
+
+  protected ExecutorService executor = null;
+  protected TestChannel channel = null;
+
+  protected static class TestChannel extends BasicChannelSemantics {
+
+    private Queue<Event> queue = new ArrayDeque<Event>();
+
+    public enum Mode {
+      NORMAL,
+      THROW_ERROR,
+      THROW_RUNTIME,
+      THROW_CHANNEL,
+      SLEEP
+    };
+
+    private Mode mode = Mode.NORMAL;
+    private boolean lastTransactionCommitted = false;
+    private boolean lastTransactionRolledBack = false;
+    private boolean lastTransactionClosed = false;
+
+    public Mode getMode() {
+      return mode;
+    }
+
+    public void setMode(Mode mode) {
+      this.mode = mode;
+    }
+
+    public boolean wasLastTransactionCommitted() {
+      return lastTransactionCommitted;
+    }
+
+    public boolean wasLastTransactionRolledBack() {
+      return lastTransactionRolledBack;
+    }
+
+    public boolean wasLastTransactionClosed() {
+      return lastTransactionClosed;
+    }
+
+    @Override
+    protected BasicTransactionSemantics createTransaction() {
+      return new TestTransaction();
+    }
+
+    protected class TestTransaction extends BasicTransactionSemantics {
+
+      protected void doMode() throws InterruptedException {
+        switch (mode) {
+          case THROW_ERROR:
+            throw new TestError();
+          case THROW_RUNTIME:
+            throw new TestRuntimeException();
+          case THROW_CHANNEL:
+            throw new ChannelException("test");
+          case SLEEP:
+            Thread.sleep(300000);
+            break;
+        }
+      }
+
+      @Override
+      protected void doBegin() throws InterruptedException {
+        doMode();
+      }
+
+      @Override
+      protected void doPut(Event event) throws InterruptedException {
+        doMode();
+        synchronized (queue) {
+          queue.add(event);
+        }
+      }
+
+      @Override
+      protected Event doTake() throws InterruptedException {
+        doMode();
+        synchronized (queue) {
+          return queue.poll();
+        }
+      }
+
+      @Override
+      protected void doCommit() throws InterruptedException {
+        doMode();
+        lastTransactionCommitted = true;
+      }
+
+      @Override
+      protected void doRollback() throws InterruptedException {
+        lastTransactionRolledBack = true;
+        doMode();
+      }
+
+      @Override
+      protected void doClose() {
+        lastTransactionClosed = true;
+        Preconditions.checkState(mode != TestChannel.Mode.SLEEP,
+            "doClose() can't throw InterruptedException, so why SLEEP?");
+        try {
+          doMode();
+        } catch (InterruptedException e) {
+          Assert.fail();
+        }
+      }
+    }
+  }
+
+  protected static class TestError extends Error {
+    static final long serialVersionUID = -1;
+  };
+
+  protected static class TestRuntimeException extends RuntimeException {
+    static final long serialVersionUID = -1;
+  };
+
+  protected void testException(Class<? extends Throwable> exceptionClass,
+      Runnable test) {
+    try {
+      test.run();
+      Assert.fail();
+    } catch (Throwable e) {
+      if (exceptionClass == InterruptedException.class
+          && e instanceof ChannelException
+          && e.getCause() instanceof InterruptedException) {
+        Assert.assertTrue(Thread.interrupted());
+      } else if (!exceptionClass.isInstance(e)) {
+        throw new AssertionError(e);
+      }
+    }
+  }
+
+  protected void testIllegalArgument(Runnable test) {
+    testException(IllegalArgumentException.class, test);
+  }
+
+  protected void testIllegalState(Runnable test) {
+    testException(IllegalStateException.class, test);
+  }
+
+  protected void testWrongThread(final Runnable test) throws Exception {
+    executor.submit(new Runnable() {
+        public void run() {
+          testIllegalState(test);
+        }
+      }).get();
+  }
+
+  protected void testMode(TestChannel.Mode mode, Runnable test) {
+    TestChannel.Mode oldMode = channel.getMode();
+    try {
+      channel.setMode(mode);
+      test.run();
+    } finally {
+      channel.setMode(oldMode);
+    }
+  }
+
+  protected void testException(TestChannel.Mode mode,
+      final Class<? extends Throwable> exceptionClass, final Runnable test) {
+    testMode(mode, new Runnable() {
+        public void run() {
+          testException(exceptionClass, test);
+        }
+      });
+  }
+
+  protected void testError(Runnable test) {
+    testException(TestChannel.Mode.THROW_ERROR, TestError.class, test);
+  }
+
+  protected void testRuntimeException(Runnable test) {
+    testException(TestChannel.Mode.THROW_RUNTIME, TestRuntimeException.class,
+        test);
+  }
+
+  protected void testChannelException(Runnable test) {
+    testException(TestChannel.Mode.THROW_CHANNEL, ChannelException.class, test);
+  }
+
+  protected void testInterrupt(final Runnable test) {
+    testMode(TestChannel.Mode.SLEEP, new Runnable() {
+        public void run() {
+          testException(InterruptedException.class, new Runnable() {
+              public void run() {
+                interruptTest(test);
+              }
+            });
+        }
+      });
+  }
+
+  protected void interruptTest(final Runnable test) {
+    final Thread mainThread = Thread.currentThread();
+    Future<?> future = executor.submit(new Runnable() {
+        public void run() {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+          }
+          mainThread.interrupt();
+        }
+      });
+    test.run();
+    try {
+      future.get();
+    } catch (Exception e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  protected void testExceptions(Runnable test) throws Exception {
+    testWrongThread(test);
+    testBasicExceptions(test);
+    testInterrupt(test);
+  }
+
+  protected void testBasicExceptions(Runnable test) throws Exception {
+    testError(test);
+    testRuntimeException(test);
+    testChannelException(test);
+  }
+
+  @Before
+  public void before() {
+    Preconditions.checkState(channel == null, "test cleanup failed!");
+    Preconditions.checkState(executor == null, "test cleanup failed!");
+    channel = new TestChannel();
+    executor = Executors.newCachedThreadPool();
+  }
+
+  @After
+  public void after() {
+    channel = null;
+    executor.shutdown();
+    executor = null;
+  }
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import java.util.concurrent.Future;
+
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestBasicChannelSemantics
+    extends AbstractBasicChannelSemanticsTest {
+
+  @Test
+  public void testHappyPath() {
+    for (int i = 0; i < events.size(); ++i) {
+      Transaction transaction = channel.getTransaction();
+      transaction.begin();
+      channel.put(events.get(i));
+      transaction.commit();
+      transaction.close();
+    }
+    for (int i = 0; i < events.size(); ++i) {
+      Transaction transaction = channel.getTransaction();
+      transaction.begin();
+      Assert.assertSame(events.get(i), channel.take());
+      transaction.commit();
+      transaction.close();
+    }
+  }
+
+  @Test
+  public void testMultiThreadedHappyPath() throws Exception {
+    final int testLength = 1000;
+    Future<?> producer = executor.submit(new Runnable() {
+        public void run() {
+          try {
+            Thread.sleep(500);
+            for (int i = 0; i < testLength; ++i) {
+              Transaction transaction = channel.getTransaction();
+              transaction.begin();
+              channel.put(events.get(i % events.size()));
+              transaction.commit();
+              transaction.close();
+              Thread.sleep(1);
+            }
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            Assert.fail();
+          }
+        }
+      });
+    int i = 0;
+    while (!producer.isDone()) {
+      Transaction transaction = channel.getTransaction();
+      transaction.begin();
+      Event event = channel.take();
+      if (event != null) {
+        Assert.assertSame(events.get(i % events.size()), event);
+        ++i;
+      }
+      transaction.commit();
+      transaction.close();
+    }
+    Assert.assertEquals(testLength, i);
+    producer.get();
+  }
+
+  @Test
+  public void testGetTransaction() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    executor.submit(new Runnable() {
+        public void run() {
+          Assert.assertNotSame(transaction, channel.getTransaction());
+        }
+      }).get();
+
+    Assert.assertSame(transaction, channel.getTransaction());
+
+    transaction.begin();
+
+    executor.submit(new Runnable() {
+        public void run() {
+          Assert.assertNotSame(transaction, channel.getTransaction());
+        }
+      }).get();
+    Assert.assertSame(transaction, channel.getTransaction());
+
+    transaction.commit();
+
+    executor.submit(new Runnable() {
+        public void run() {
+          Assert.assertNotSame(transaction, channel.getTransaction());
+        }
+      }).get();
+    Assert.assertSame(transaction, channel.getTransaction());
+
+    transaction.close();
+
+    executor.submit(new Runnable() {
+        public void run() {
+          Assert.assertNotSame(transaction, channel.getTransaction());
+        }
+      }).get();
+    Assert.assertNotSame(transaction, channel.getTransaction());
+  }
+
+  @Test
+  public void testBegin() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testExceptions(new Runnable() {
+        public void run() {
+          transaction.begin();
+        }
+      });
+
+    transaction.begin();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.begin();
+        }
+      });
+
+    transaction.commit();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.begin();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.begin();
+        }
+      });
+  }
+
+  @Test
+  public void testPut1() throws Exception {
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+
+    Transaction transaction = channel.getTransaction();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+
+    transaction.begin();
+    channel.put(events.get(0));
+
+    testIllegalArgument(new Runnable() {
+        public void run() {
+          channel.put(null);
+        }
+      });
+
+    testExceptions(new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+
+    transaction.commit();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+  }
+
+  @Test
+  public void testPut2() throws Exception {
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(events.get(0));
+    transaction.rollback();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+  }
+
+  @Test
+  public void testPut3() throws Exception {
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    channel.put(events.get(0));
+
+    final Transaction finalTransaction = transaction;
+    testChannelException(new Runnable() {
+        public void run() {
+          finalTransaction.commit();
+        }
+      });
+
+    transaction.rollback();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+  }
+
+  @Test
+  public void testTake1() throws Exception {
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+
+    Transaction transaction = channel.getTransaction();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+
+    transaction.begin();
+    Assert.assertNull(channel.take());
+
+    for (int i = 0; i < 1000; ++i) {
+      channel.put(events.get(i % events.size()));
+    }
+    Assert.assertNotNull(channel.take());
+
+    testWrongThread(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+
+    testBasicExceptions(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+
+    testMode(TestChannel.Mode.SLEEP, new Runnable() {
+        public void run() {
+          interruptTest(new Runnable() {
+              public void run() {
+                Assert.assertNull(channel.take());
+                Assert.assertTrue(Thread.interrupted());
+              }
+            });
+        }
+      });
+
+    Assert.assertNotNull(channel.take());
+
+    transaction.commit();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+  }
+
+  @Test
+  public void testTake2() throws Exception {
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    channel.take();
+    transaction.rollback();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+  }
+
+  @Test
+  public void testTake3() throws Exception {
+    Transaction transaction = channel.getTransaction();
+    transaction.begin();
+    channel.take();
+
+    final Transaction finalTransaction = transaction;
+    testChannelException(new Runnable() {
+        public void run() {
+          finalTransaction.commit();
+        }
+      });
+
+    transaction.rollback();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          channel.take();
+        }
+      });
+  }
+
+  @Test
+  public void testCommit1() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.commit();
+        }
+      });
+
+    transaction.begin();
+
+    testExceptions(new Runnable() {
+        public void run() {
+          transaction.commit();
+        }
+      });
+
+    transaction.commit();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.commit();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.commit();
+        }
+      });
+  }
+
+  @Test
+  public void testCommit2() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+    transaction.rollback();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.commit();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.commit();
+        }
+      });
+  }
+
+  @Test
+  public void testRollback1() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.begin();
+
+    testWrongThread(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.rollback();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+  }
+
+  @Test
+  public void testRollback2() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.begin();
+
+    testError(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+  }
+
+  @Test
+  public void testRollback3() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.begin();
+
+    testRuntimeException(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+  }
+
+  @Test
+  public void testRollback4() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.begin();
+
+    testChannelException(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+  }
+
+
+  @Test
+  public void testRollback5() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.begin();
+
+    testInterrupt(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+  }
+
+  @Test
+  public void testRollback6() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+    transaction.commit();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+  }
+
+  @Test
+  public void testRollback7() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    transaction.begin();
+
+    testExceptions(new Runnable() {
+        public void run() {
+          transaction.commit();
+        }
+      });
+
+    transaction.rollback();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+
+    transaction.close();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.rollback();
+        }
+      });
+  }
+
+  @Test
+  public void testClose1() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testError(new Runnable() {
+        public void run() {
+          transaction.close();
+        }
+      });
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.close();
+        }
+      });
+  }
+
+  @Test
+  public void testClose2() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testRuntimeException(new Runnable() {
+        public void run() {
+          transaction.close();
+        }
+      });
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.close();
+        }
+      });
+  }
+
+  @Test
+  public void testClose3() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+
+    testChannelException(new Runnable() {
+        public void run() {
+          transaction.close();
+        }
+      });
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.close();
+        }
+      });
+  }
+
+  @Test
+  public void testClose4() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.close();
+        }
+      });
+  }
+
+  @Test
+  public void testClose5() throws Exception {
+    final Transaction transaction = channel.getTransaction();
+    transaction.begin();
+
+    testChannelException(new Runnable() {
+        public void run() {
+          transaction.commit();
+        }
+      });
+
+    testIllegalState(new Runnable() {
+        public void run() {
+          transaction.close();
+        }
+      });
+  }
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import java.util.List;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestChannelUtils
+    extends AbstractBasicChannelSemanticsTest {
+
+  @Test
+  public void testHappyPath1() {
+    ChannelUtils.put(channel, events.get(0));
+    Assert.assertTrue(channel.wasLastTransactionCommitted());
+    Assert.assertFalse(channel.wasLastTransactionRolledBack());
+    Assert.assertTrue(channel.wasLastTransactionClosed());
+  }
+
+  @Test
+  public void testHappyPath2() {
+    ChannelUtils.take(channel);
+    Assert.assertTrue(channel.wasLastTransactionCommitted());
+    Assert.assertFalse(channel.wasLastTransactionRolledBack());
+    Assert.assertTrue(channel.wasLastTransactionClosed());
+  }
+
+  @Test
+  public void testHappyPath3() {
+    ChannelUtils.put(channel, events.get(0));
+    Assert.assertSame(events.get(0), ChannelUtils.take(channel));
+  }
+
+  @Test
+  public void testHappyPath4() {
+    for (int i = 0; i < events.size(); ++i) {
+      ChannelUtils.put(channel, events.get(i));
+    }
+    for (int i = 0; i < events.size(); ++i) {
+      Assert.assertSame(events.get(i), ChannelUtils.take(channel));
+    }
+  }
+
+  @Test
+  public void testHappyPath5() {
+    int rounds = 10;
+    for (int i = 0; i < rounds; ++i) {
+      ChannelUtils.put(channel, events);
+    }
+    for (int i = 0; i < rounds; ++i) {
+      List<Event> takenEvents = ChannelUtils.take(channel, events.size());
+      Assert.assertTrue(takenEvents.size() == events.size());
+      for (int j = 0; j < events.size(); ++j) {
+        Assert.assertSame(events.get(j), takenEvents.get(j));
+      }
+    }
+  }
+
+  private void testTransact(final TestChannel.Mode mode,
+      Class<? extends Throwable> exceptionClass, final Runnable test) {
+    testException(exceptionClass, new Runnable() {
+        public void run() {
+          ChannelUtils.transact(channel, new Runnable() {
+              public void run() {
+                testMode(mode, test);
+              }
+            });
+        }
+      });
+    Assert.assertFalse(channel.wasLastTransactionCommitted());
+    Assert.assertTrue(channel.wasLastTransactionRolledBack());
+    Assert.assertTrue(channel.wasLastTransactionClosed());
+  }
+
+  private void testTransact(TestChannel.Mode mode,
+      Class<? extends Throwable> exceptionClass) {
+    testTransact(mode, exceptionClass, new Runnable() {
+        public void run() {
+          channel.put(events.get(0));
+        }
+      });
+  }
+
+  @Test
+  public void testError() {
+    testTransact(TestChannel.Mode.THROW_ERROR, TestError.class);
+  }
+
+  @Test
+  public void testRuntimeException() {
+    testTransact(TestChannel.Mode.THROW_RUNTIME, TestRuntimeException.class);
+  }
+
+  @Test
+  public void testChannelException() {
+    testTransact(TestChannel.Mode.THROW_CHANNEL, ChannelException.class);
+  }
+
+  @Test
+  public void testInterrupt() throws Exception {
+    testTransact(TestChannel.Mode.SLEEP, InterruptedException.class,
+        new Runnable() {
+          public void run() {
+            interruptTest(new Runnable() {
+                public void run() {
+                  channel.put(events.get(0));
+                }
+              });
+          }
+      });
+  }
+}

Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native