You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/02/13 22:32:42 UTC
svn commit: r1243703 - in
/incubator/flume/branches/flume-728/flume-ng-core/src:
main/java/org/apache/flume/ main/java/org/apache/flume/channel/
test/java/org/apache/flume/channel/
Author: arvind
Date: Mon Feb 13 21:32:42 2012
New Revision: 1243703
URL: http://svn.apache.org/viewvc?rev=1243703&view=rev
Log:
FLUME-935. Create abstract implementations of basic channel/transaction semantics.
(Peter Newcomb via Arvind Prabhakar)
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java (with props)
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java (with props)
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java?rev=1243703&r1=1243702&r2=1243703&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/ChannelException.java Mon Feb 13 21:32:42 2012
@@ -34,7 +34,7 @@ public class ChannelException extends Ru
/**
* @param ex the causal exception
*/
- public ChannelException(Exception ex) {
+ public ChannelException(Throwable ex) {
super(ex);
}
@@ -42,7 +42,7 @@ public class ChannelException extends Ru
* @param message the exception message
* @param ex the causal exception
*/
- public ChannelException(String message, Exception ex) {
+ public ChannelException(String message, Throwable ex) {
super(message, ex);
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * An implementation of basic {@link Channel} semantics, including the
+ * implied thread-local semantics of the {@link Transaction} class,
+ * which is required to extend {@link BasicTransactionSemantics}.
+ * </p>
+ */
+public abstract class BasicChannelSemantics extends AbstractChannel {
+
+ private ThreadLocal<BasicTransactionSemantics> currentTransaction
+ = new ThreadLocal<BasicTransactionSemantics>();
+
+ private boolean initialized = false;
+
+ /**
+ * <p>
+ * Called upon first getTransaction() request, while synchronized on
+ * this {@link Channel} instance. Use this method to delay the
+ * initializization resources until just before the first
+ * transaction begins.
+ * </p>
+ */
+ protected void initialize() {}
+
+ /**
+ * <p>
+ * Called to create new {@link Transaction} objects, which must
+ * extend {@link BasicTransactionSemantics}. Each object is used
+ * for only one transaction, but is stored in a thread-local and
+ * retrieved by <code>getTransaction</code> for the duration of that
+ * transaction.
+ * </p>
+ */
+ protected abstract BasicTransactionSemantics createTransaction();
+
+ /**
+ * <p>
+ * Ensures that a transaction exists for this thread and then
+ * delegates the <code>put</code> to the thread's {@link
+ * BasicTransactionSemantics} instance.
+ * </p>
+ */
+ @Override
+ public void put(Event event) throws ChannelException {
+ BasicTransactionSemantics transaction = currentTransaction.get();
+ Preconditions.checkState(transaction != null,
+ "No transaction exists for this thread");
+ transaction.put(event);
+ }
+
+ /**
+ * <p>
+ * Ensures that a transaction exists for this thread and then
+ * delegates the <code>take</code> to the thread's {@link
+ * BasicTransactionSemantics} instance.
+ * </p>
+ */
+ @Override
+ public Event take() throws ChannelException {
+ BasicTransactionSemantics transaction = currentTransaction.get();
+ Preconditions.checkState(transaction != null,
+ "No transaction exists for this thread");
+ return transaction.take();
+ }
+
+ /**
+ * <p>
+ * Initializes the channel if it is not already, then checks to see
+ * if there is an open transaction for this thread, creating a new
+ * one via <code>createTransaction</code> if not.
+ * @return the current <code>Transaction</code> object for the
+ * calling thread
+ * </p>
+ */
+ @Override
+ public Transaction getTransaction() {
+
+ if (!initialized) {
+ synchronized (this) {
+ if (!initialized) {
+ initialize();
+ initialized = true;
+ }
+ }
+ }
+
+ BasicTransactionSemantics transaction = currentTransaction.get();
+ if (transaction == null || transaction.getState().equals(
+ BasicTransactionSemantics.State.CLOSED)) {
+ transaction = createTransaction();
+ currentTransaction.set(transaction);
+ }
+ return transaction;
+ }
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicChannelSemantics.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * An implementation of basic {@link Transaction} semantics designed
+ * to work in concert with {@link BasicChannelSemantics} to simplify
+ * creation of robust {@link Channel} implementations. This class
+ * ensures that each transaction implementation method is called only
+ * while the transaction is in the correct state for that method, and
+ * only by the thread that created the transaction. Nested calls to
+ * <code>begin()</code> and <code>close()</code> are supported as long
+ * as they are balanced.
+ * </p>
+ * <p>
+ * Subclasses need only implement <code>doPut</code>,
+ * <code>doTake</code>, <code>doCommit</code>, and
+ * <code>doRollback</code>, and the developer can rest assured that
+ * those methods are called only after transaction state preconditions
+ * have been properly met. <code>doBegin</code> and
+ * <code>doClose</code> may also be implemented if there is work to be
+ * done at those points.
+ * </p>
+ * <p>
+ * All InterruptedException exceptions thrown from the implementations
+ * of the <code>doXXX</code> methods are automatically wrapped to
+ * become ChannelExceptions, but only after restoring the interrupted
+ * status of the thread so that any subsequent blocking method calls
+ * will themselves throw InterruptedException rather than blocking.
+ * The exception to this rule is <code>doTake</code>, which simply
+ * returns null instead of wrapping and propagating the
+ * InterruptedException, though it still first restores the
+ * interrupted status of the thread.
+ * </p>
+ */
+public abstract class BasicTransactionSemantics implements Transaction {
+
+ private State state;
+ private long initialThreadId;
+
+ protected void doBegin() throws InterruptedException {}
+ protected abstract void doPut(Event event) throws InterruptedException;
+ protected abstract Event doTake() throws InterruptedException;
+ protected abstract void doCommit() throws InterruptedException;
+ protected abstract void doRollback() throws InterruptedException;
+ protected void doClose() {}
+
+ protected BasicTransactionSemantics() {
+ state = State.NEW;
+ initialThreadId = Thread.currentThread().getId();
+ }
+
+ /**
+ * <p>
+ * The method to which {@link BasicChannelSemantics} delegates calls
+ * to <code>put</code>.
+ * </p>
+ */
+ protected void put(Event event) {
+ Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+ "put() called from different thread than getTransaction()!");
+ Preconditions.checkState(state.equals(State.OPEN),
+ "put() called when transaction is %s!", state);
+ Preconditions.checkArgument(event != null,
+ "put() called with null event!");
+
+ try {
+ doPut(event);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ChannelException(e.toString(), e);
+ }
+ }
+
+ /**
+ * <p>
+ * The method to which {@link BasicChannelSemantics} delegates calls
+ * to <code>take</code>.
+ * </p>
+ */
+ protected Event take() {
+ Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+ "take() called from different thread than getTransaction()!");
+ Preconditions.checkState(state.equals(State.OPEN),
+ "take() called when transaction is %s!", state);
+
+ try {
+ return doTake();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ }
+ }
+
+ /**
+ * @return the current state of the transaction
+ */
+ protected State getState() {
+ return state;
+ }
+
+ @Override
+ public void begin() {
+ Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+ "begin() called from different thread than getTransaction()!");
+ Preconditions.checkState(state.equals(State.NEW),
+ "begin() called when transaction is " + state + "!");
+
+ try {
+ doBegin();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ChannelException(e.toString(), e);
+ }
+ state = State.OPEN;
+ }
+
+ @Override
+ public void commit() {
+ Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+ "commit() called from different thread than getTransaction()!");
+ Preconditions.checkState(state.equals(State.OPEN),
+ "commit() called when transaction is %s!", state);
+
+ try {
+ doCommit();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ChannelException(e.toString(), e);
+ }
+ state = State.COMPLETED;
+ }
+
+ @Override
+ public void rollback() {
+ Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+ "rollback() called from different thread than getTransaction()!");
+ Preconditions.checkState(state.equals(State.OPEN),
+ "rollback() called when transaction is %s!", state);
+
+ state = State.COMPLETED;
+ try {
+ doRollback();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ChannelException(e.toString(), e);
+ }
+ }
+
+ @Override
+ public void close() {
+ Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,
+ "close() called from different thread than getTransaction()!");
+ Preconditions.checkState(
+ state.equals(State.NEW) || state.equals(State.COMPLETED),
+ "close() called when transaction is %s"
+ + " - you must either commit or rollback first", state);
+
+ state = State.CLOSED;
+ doClose();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("BasicTransactionSemantics: {");
+ builder.append(" state:").append(state);
+ builder.append(" initialThreadId:").append(initialThreadId);
+ builder.append(" }");
+ return builder.toString();
+ }
+
+ /**
+ * <p>
+ * The state of the {@link Transaction} to which it belongs.
+ * </p>
+ * <dl>
+ * <dt>NEW</dt>
+ * <dd>A newly created transaction that has not yet begun.</dd>
+ * <dt>OPEN</dt>
+ * <dd>A transaction that is open. It is permissible to commit or rollback.
+ * </dd>
+ * <dt>COMPLETED</dt>
+ * <dd>This transaction has been committed or rolled back. It is illegal to
+ * perform any further operations beyond closing it.</dd>
+ * <dt>CLOSED</dt>
+ * <dd>A closed transaction. No further operations are permitted.</dd>
+ */
+ protected static enum State {
+ NEW, OPEN, COMPLETED, CLOSED
+ }
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/BasicTransactionSemantics.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * A collection of utilities for interacting with {@link Channel}
+ * objects. Use of these utilities prevents error-prone replication
+ * of required transaction usage semantics, and allows for more
+ * concise code.
+ * </p>
+ * <p>
+ * However, as a side-effect of its generality, and in particular of
+ * its use of {@link Callable}, any checked exceptions thrown by
+ * user-created transactors will be silently wrapped with {@link
+ * ChannelException} before being propagated. Only direct use of
+ * {@link #transact(Channel,Callable)} suffers from this issue, even
+ * though all other methods are based upon it, because none of the
+ * other methods are capable of producing or allowing checked
+ * exceptions in the first place.
+ * </p>
+ */
+public class ChannelUtils {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(ChannelUtils.class);
+
+ /**
+ * <p>
+ * A convenience method for single-event <code>put</code> transactions.
+ * </p>
+ * @see #transact(Channel,Callable)
+ */
+ public static void put(final Channel channel, final Event event)
+ throws ChannelException {
+ transact(channel, new Runnable() {
+ @Override
+ public void run() {
+ channel.put(event);
+ }
+ });
+ }
+
+ /**
+ * <p>
+ * A convenience method for multiple-event <code>put</code> transactions.
+ * </p>
+ * @see #transact(Channel,Callable)
+ */
+ public static void put(final Channel channel, final Collection<Event> events)
+ throws ChannelException {
+ transact(channel, new Runnable() {
+ @Override
+ public void run() {
+ for (Event event : events) {
+ channel.put(event);
+ }
+ }
+ });
+ }
+
+ /**
+ * <p>
+ * A convenience method for single-event <code>take</code> transactions.
+ * </p>
+ * @return a single event, or null if the channel has none available
+ * @see #transact(Channel,Callable)
+ */
+ public static Event take(final Channel channel)
+ throws ChannelException {
+ return transact(channel, new Callable<Event>() {
+ @Override
+ public Event call() {
+ return channel.take();
+ }
+ });
+ }
+
+ /**
+ * <p>
+ * A convenience method for multiple-event <code>take</code> transactions.
+ * </p>
+ * @return a list of at most <code>max</code> events
+ * @see #transact(Channel,Callable)
+ */
+ public static List<Event> take(final Channel channel, final int max)
+ throws ChannelException {
+ return transact(channel, new Callable<List<Event>>() {
+ @Override
+ public List<Event> call() {
+ List<Event> events = new ArrayList<Event>(max);
+ while (events.size() < max) {
+ Event event = channel.take();
+ if (event == null) {
+ break;
+ }
+ events.add(event);
+ }
+ return events;
+ }
+ });
+ }
+
+ /**
+ * <p>
+ * A convenience method for transactions that don't require a return
+ * value. Simply wraps the <code>transactor</code> using {@link
+ * Executors#callable} and passes that to {@link
+ * #transact(Channel,Callable)}.
+ * </p>
+ * @see #transact(Channel,Callable)
+ * @see Executors#callable(Runnable)
+ */
+ public static void transact(Channel channel, Runnable transactor)
+ throws ChannelException {
+ transact(channel, Executors.callable(transactor));
+ }
+
+ /**
+ * <p>
+ * A general optimistic implementation of {@link Transaction} client
+ * semantics. It gets a new transaction object from the
+ * <code>channel</code>, calls <code>begin()</code> on it, and then
+ * invokes the supplied <code>transactor</code> object. If an
+ * exception is thrown, then the transaction is rolled back;
+ * otherwise the transaction is committed and the value returned by
+ * the <code>transactor</code> is returned. In either case, the
+ * transaction is closed before the function exits. All secondary
+ * exceptions (i.e. those thrown by
+ * <code>Transaction.rollback()</code> or
+ * <code>Transaction.close()</code> while recovering from an earlier
+ * exception) are logged, allowing the original exception to be
+ * propagated instead.
+ * </p>
+ * <p>
+ * This implementation is optimistic in that it expects transaction
+ * rollback to be infrequent: it will rollback a transaction only
+ * when the supplied <code>transactor</code> throws an exception,
+ * and exceptions are a fairly heavyweight mechanism for handling
+ * frequently-occurring events.
+ * </p>
+ * @return the value returned by <code>transactor.call()</code>
+ */
+ public static <T> T transact(Channel channel, Callable<T> transactor)
+ throws ChannelException {
+ Transaction transaction = channel.getTransaction();
+ boolean committed = false;
+ boolean interrupted = false;
+ try {
+ transaction.begin();
+ T value = transactor.call();
+ transaction.commit();
+ committed = true;
+ return value;
+ } catch (Throwable e) {
+ interrupted = Thread.currentThread().isInterrupted();
+ try {
+ transaction.rollback();
+ } catch (Throwable e2) {
+ logger.error("Failed to roll back transaction, exception follows:", e2);
+ }
+ if (e instanceof InterruptedException) {
+ interrupted = true;
+ } else if (e instanceof Error) {
+ throw (Error) e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ }
+ throw new ChannelException(e);
+ } finally {
+ interrupted = interrupted || Thread.currentThread().isInterrupted();
+ try {
+ transaction.close();
+ } catch (Throwable e) {
+ if (committed) {
+ if (e instanceof Error) {
+ throw (Error) e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ } else {
+ throw new ChannelException(e);
+ }
+ } else {
+ logger.error(
+ "Failed to close transaction after error, exception follows:", e);
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ /** Disallows instantiation */
+ private ChannelUtils() {}
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelUtils.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import com.google.common.base.Preconditions;
+
+public abstract class AbstractBasicChannelSemanticsTest {
+
+ protected static List<Event> events;
+ static {
+ Event[] array = new Event[7];
+ for (int i = 0; i < array.length; ++i) {
+ array[i] = EventBuilder.withBody(("test event " + i).getBytes());
+ }
+ events = Collections.unmodifiableList(Arrays.asList(array));
+ }
+
+ protected ExecutorService executor = null;
+ protected TestChannel channel = null;
+
+ protected static class TestChannel extends BasicChannelSemantics {
+
+ private Queue<Event> queue = new ArrayDeque<Event>();
+
+ public enum Mode {
+ NORMAL,
+ THROW_ERROR,
+ THROW_RUNTIME,
+ THROW_CHANNEL,
+ SLEEP
+ };
+
+ private Mode mode = Mode.NORMAL;
+ private boolean lastTransactionCommitted = false;
+ private boolean lastTransactionRolledBack = false;
+ private boolean lastTransactionClosed = false;
+
+ public Mode getMode() {
+ return mode;
+ }
+
+ public void setMode(Mode mode) {
+ this.mode = mode;
+ }
+
+ public boolean wasLastTransactionCommitted() {
+ return lastTransactionCommitted;
+ }
+
+ public boolean wasLastTransactionRolledBack() {
+ return lastTransactionRolledBack;
+ }
+
+ public boolean wasLastTransactionClosed() {
+ return lastTransactionClosed;
+ }
+
+ @Override
+ protected BasicTransactionSemantics createTransaction() {
+ return new TestTransaction();
+ }
+
+ protected class TestTransaction extends BasicTransactionSemantics {
+
+ protected void doMode() throws InterruptedException {
+ switch (mode) {
+ case THROW_ERROR:
+ throw new TestError();
+ case THROW_RUNTIME:
+ throw new TestRuntimeException();
+ case THROW_CHANNEL:
+ throw new ChannelException("test");
+ case SLEEP:
+ Thread.sleep(300000);
+ break;
+ }
+ }
+
+ @Override
+ protected void doBegin() throws InterruptedException {
+ doMode();
+ }
+
+ @Override
+ protected void doPut(Event event) throws InterruptedException {
+ doMode();
+ synchronized (queue) {
+ queue.add(event);
+ }
+ }
+
+ @Override
+ protected Event doTake() throws InterruptedException {
+ doMode();
+ synchronized (queue) {
+ return queue.poll();
+ }
+ }
+
+ @Override
+ protected void doCommit() throws InterruptedException {
+ doMode();
+ lastTransactionCommitted = true;
+ }
+
+ @Override
+ protected void doRollback() throws InterruptedException {
+ lastTransactionRolledBack = true;
+ doMode();
+ }
+
+ @Override
+ protected void doClose() {
+ lastTransactionClosed = true;
+ Preconditions.checkState(mode != TestChannel.Mode.SLEEP,
+ "doClose() can't throw InterruptedException, so why SLEEP?");
+ try {
+ doMode();
+ } catch (InterruptedException e) {
+ Assert.fail();
+ }
+ }
+ }
+ }
+
+ protected static class TestError extends Error {
+ static final long serialVersionUID = -1;
+ };
+
+ protected static class TestRuntimeException extends RuntimeException {
+ static final long serialVersionUID = -1;
+ };
+
+ protected void testException(Class<? extends Throwable> exceptionClass,
+ Runnable test) {
+ try {
+ test.run();
+ Assert.fail();
+ } catch (Throwable e) {
+ if (exceptionClass == InterruptedException.class
+ && e instanceof ChannelException
+ && e.getCause() instanceof InterruptedException) {
+ Assert.assertTrue(Thread.interrupted());
+ } else if (!exceptionClass.isInstance(e)) {
+ throw new AssertionError(e);
+ }
+ }
+ }
+
+ protected void testIllegalArgument(Runnable test) {
+ testException(IllegalArgumentException.class, test);
+ }
+
+ protected void testIllegalState(Runnable test) {
+ testException(IllegalStateException.class, test);
+ }
+
+ protected void testWrongThread(final Runnable test) throws Exception {
+ executor.submit(new Runnable() {
+ public void run() {
+ testIllegalState(test);
+ }
+ }).get();
+ }
+
+ protected void testMode(TestChannel.Mode mode, Runnable test) {
+ TestChannel.Mode oldMode = channel.getMode();
+ try {
+ channel.setMode(mode);
+ test.run();
+ } finally {
+ channel.setMode(oldMode);
+ }
+ }
+
+ protected void testException(TestChannel.Mode mode,
+ final Class<? extends Throwable> exceptionClass, final Runnable test) {
+ testMode(mode, new Runnable() {
+ public void run() {
+ testException(exceptionClass, test);
+ }
+ });
+ }
+
+ protected void testError(Runnable test) {
+ testException(TestChannel.Mode.THROW_ERROR, TestError.class, test);
+ }
+
+ protected void testRuntimeException(Runnable test) {
+ testException(TestChannel.Mode.THROW_RUNTIME, TestRuntimeException.class,
+ test);
+ }
+
+ protected void testChannelException(Runnable test) {
+ testException(TestChannel.Mode.THROW_CHANNEL, ChannelException.class, test);
+ }
+
+ protected void testInterrupt(final Runnable test) {
+ testMode(TestChannel.Mode.SLEEP, new Runnable() {
+ public void run() {
+ testException(InterruptedException.class, new Runnable() {
+ public void run() {
+ interruptTest(test);
+ }
+ });
+ }
+ });
+ }
+
+ protected void interruptTest(final Runnable test) {
+ final Thread mainThread = Thread.currentThread();
+ Future<?> future = executor.submit(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ mainThread.interrupt();
+ }
+ });
+ test.run();
+ try {
+ future.get();
+ } catch (Exception e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ protected void testExceptions(Runnable test) throws Exception {
+ testWrongThread(test);
+ testBasicExceptions(test);
+ testInterrupt(test);
+ }
+
+ protected void testBasicExceptions(Runnable test) throws Exception {
+ testError(test);
+ testRuntimeException(test);
+ testChannelException(test);
+ }
+
+ @Before
+ public void before() {
+ Preconditions.checkState(channel == null, "test cleanup failed!");
+ Preconditions.checkState(executor == null, "test cleanup failed!");
+ channel = new TestChannel();
+ executor = Executors.newCachedThreadPool();
+ }
+
+ @After
+ public void after() {
+ channel = null;
+ executor.shutdown();
+ executor = null;
+ }
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/AbstractBasicChannelSemanticsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import java.util.concurrent.Future;
+
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestBasicChannelSemantics
+ extends AbstractBasicChannelSemanticsTest {
+
+ @Test
+ public void testHappyPath() {
+ for (int i = 0; i < events.size(); ++i) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(events.get(i));
+ transaction.commit();
+ transaction.close();
+ }
+ for (int i = 0; i < events.size(); ++i) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ Assert.assertSame(events.get(i), channel.take());
+ transaction.commit();
+ transaction.close();
+ }
+ }
+
+ @Test
+ public void testMultiThreadedHappyPath() throws Exception {
+ final int testLength = 1000;
+ Future<?> producer = executor.submit(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(500);
+ for (int i = 0; i < testLength; ++i) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(events.get(i % events.size()));
+ transaction.commit();
+ transaction.close();
+ Thread.sleep(1);
+ }
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Assert.fail();
+ }
+ }
+ });
+ int i = 0;
+ while (!producer.isDone()) {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ Event event = channel.take();
+ if (event != null) {
+ Assert.assertSame(events.get(i % events.size()), event);
+ ++i;
+ }
+ transaction.commit();
+ transaction.close();
+ }
+ Assert.assertEquals(testLength, i);
+ producer.get();
+ }
+
+ @Test
+ public void testGetTransaction() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ executor.submit(new Runnable() {
+ public void run() {
+ Assert.assertNotSame(transaction, channel.getTransaction());
+ }
+ }).get();
+
+ Assert.assertSame(transaction, channel.getTransaction());
+
+ transaction.begin();
+
+ executor.submit(new Runnable() {
+ public void run() {
+ Assert.assertNotSame(transaction, channel.getTransaction());
+ }
+ }).get();
+ Assert.assertSame(transaction, channel.getTransaction());
+
+ transaction.commit();
+
+ executor.submit(new Runnable() {
+ public void run() {
+ Assert.assertNotSame(transaction, channel.getTransaction());
+ }
+ }).get();
+ Assert.assertSame(transaction, channel.getTransaction());
+
+ transaction.close();
+
+ executor.submit(new Runnable() {
+ public void run() {
+ Assert.assertNotSame(transaction, channel.getTransaction());
+ }
+ }).get();
+ Assert.assertNotSame(transaction, channel.getTransaction());
+ }
+
+ @Test
+ public void testBegin() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testExceptions(new Runnable() {
+ public void run() {
+ transaction.begin();
+ }
+ });
+
+ transaction.begin();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.begin();
+ }
+ });
+
+ transaction.commit();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.begin();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.begin();
+ }
+ });
+ }
+
+ @Test
+ public void testPut1() throws Exception {
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+
+ Transaction transaction = channel.getTransaction();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+
+ transaction.begin();
+ channel.put(events.get(0));
+
+ testIllegalArgument(new Runnable() {
+ public void run() {
+ channel.put(null);
+ }
+ });
+
+ testExceptions(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+
+ transaction.commit();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+ }
+
+ @Test
+ public void testPut2() throws Exception {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(events.get(0));
+ transaction.rollback();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+ }
+
+ @Test
+ public void testPut3() throws Exception {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(events.get(0));
+
+ final Transaction finalTransaction = transaction;
+ testChannelException(new Runnable() {
+ public void run() {
+ finalTransaction.commit();
+ }
+ });
+
+ transaction.rollback();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+ }
+
+ @Test
+ public void testTake1() throws Exception {
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+
+ Transaction transaction = channel.getTransaction();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+
+ transaction.begin();
+ Assert.assertNull(channel.take());
+
+ for (int i = 0; i < 1000; ++i) {
+ channel.put(events.get(i % events.size()));
+ }
+ Assert.assertNotNull(channel.take());
+
+ testWrongThread(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+
+ testBasicExceptions(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+
+ testMode(TestChannel.Mode.SLEEP, new Runnable() {
+ public void run() {
+ interruptTest(new Runnable() {
+ public void run() {
+ Assert.assertNull(channel.take());
+ Assert.assertTrue(Thread.interrupted());
+ }
+ });
+ }
+ });
+
+ Assert.assertNotNull(channel.take());
+
+ transaction.commit();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+ }
+
+ @Test
+ public void testTake2() throws Exception {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.take();
+ transaction.rollback();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+ }
+
+ @Test
+ public void testTake3() throws Exception {
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.take();
+
+ final Transaction finalTransaction = transaction;
+ testChannelException(new Runnable() {
+ public void run() {
+ finalTransaction.commit();
+ }
+ });
+
+ transaction.rollback();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ channel.take();
+ }
+ });
+ }
+
+ @Test
+ public void testCommit1() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.commit();
+ }
+ });
+
+ transaction.begin();
+
+ testExceptions(new Runnable() {
+ public void run() {
+ transaction.commit();
+ }
+ });
+
+ transaction.commit();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.commit();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.commit();
+ }
+ });
+ }
+
+ @Test
+ public void testCommit2() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ transaction.begin();
+ transaction.rollback();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.commit();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.commit();
+ }
+ });
+ }
+
+ @Test
+ public void testRollback1() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.begin();
+
+ testWrongThread(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.rollback();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+ }
+
+ @Test
+ public void testRollback2() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.begin();
+
+ testError(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+ }
+
+ @Test
+ public void testRollback3() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.begin();
+
+ testRuntimeException(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+ }
+
+ @Test
+ public void testRollback4() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.begin();
+
+ testChannelException(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+ }
+
+
+ @Test
+ public void testRollback5() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.begin();
+
+ testInterrupt(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+ }
+
+ @Test
+ public void testRollback6() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ transaction.begin();
+ transaction.commit();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+ }
+
+ @Test
+ public void testRollback7() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ transaction.begin();
+
+ testExceptions(new Runnable() {
+ public void run() {
+ transaction.commit();
+ }
+ });
+
+ transaction.rollback();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+
+ transaction.close();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.rollback();
+ }
+ });
+ }
+
+ @Test
+ public void testClose1() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testError(new Runnable() {
+ public void run() {
+ transaction.close();
+ }
+ });
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.close();
+ }
+ });
+ }
+
+ @Test
+ public void testClose2() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testRuntimeException(new Runnable() {
+ public void run() {
+ transaction.close();
+ }
+ });
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.close();
+ }
+ });
+ }
+
+ @Test
+ public void testClose3() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+
+ testChannelException(new Runnable() {
+ public void run() {
+ transaction.close();
+ }
+ });
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.close();
+ }
+ });
+ }
+
+ @Test
+ public void testClose4() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.close();
+ }
+ });
+ }
+
+ @Test
+ public void testClose5() throws Exception {
+ final Transaction transaction = channel.getTransaction();
+ transaction.begin();
+
+ testChannelException(new Runnable() {
+ public void run() {
+ transaction.commit();
+ }
+ });
+
+ testIllegalState(new Runnable() {
+ public void run() {
+ transaction.close();
+ }
+ });
+ }
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestBasicChannelSemantics.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java?rev=1243703&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java Mon Feb 13 21:32:42 2012
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.channel;
+
+import java.util.List;
+
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestChannelUtils
+ extends AbstractBasicChannelSemanticsTest {
+
+ @Test
+ public void testHappyPath1() {
+ ChannelUtils.put(channel, events.get(0));
+ Assert.assertTrue(channel.wasLastTransactionCommitted());
+ Assert.assertFalse(channel.wasLastTransactionRolledBack());
+ Assert.assertTrue(channel.wasLastTransactionClosed());
+ }
+
+ @Test
+ public void testHappyPath2() {
+ ChannelUtils.take(channel);
+ Assert.assertTrue(channel.wasLastTransactionCommitted());
+ Assert.assertFalse(channel.wasLastTransactionRolledBack());
+ Assert.assertTrue(channel.wasLastTransactionClosed());
+ }
+
+ @Test
+ public void testHappyPath3() {
+ ChannelUtils.put(channel, events.get(0));
+ Assert.assertSame(events.get(0), ChannelUtils.take(channel));
+ }
+
+ @Test
+ public void testHappyPath4() {
+ for (int i = 0; i < events.size(); ++i) {
+ ChannelUtils.put(channel, events.get(i));
+ }
+ for (int i = 0; i < events.size(); ++i) {
+ Assert.assertSame(events.get(i), ChannelUtils.take(channel));
+ }
+ }
+
+ @Test
+ public void testHappyPath5() {
+ int rounds = 10;
+ for (int i = 0; i < rounds; ++i) {
+ ChannelUtils.put(channel, events);
+ }
+ for (int i = 0; i < rounds; ++i) {
+ List<Event> takenEvents = ChannelUtils.take(channel, events.size());
+ Assert.assertTrue(takenEvents.size() == events.size());
+ for (int j = 0; j < events.size(); ++j) {
+ Assert.assertSame(events.get(j), takenEvents.get(j));
+ }
+ }
+ }
+
+ private void testTransact(final TestChannel.Mode mode,
+ Class<? extends Throwable> exceptionClass, final Runnable test) {
+ testException(exceptionClass, new Runnable() {
+ public void run() {
+ ChannelUtils.transact(channel, new Runnable() {
+ public void run() {
+ testMode(mode, test);
+ }
+ });
+ }
+ });
+ Assert.assertFalse(channel.wasLastTransactionCommitted());
+ Assert.assertTrue(channel.wasLastTransactionRolledBack());
+ Assert.assertTrue(channel.wasLastTransactionClosed());
+ }
+
+ private void testTransact(TestChannel.Mode mode,
+ Class<? extends Throwable> exceptionClass) {
+ testTransact(mode, exceptionClass, new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+ }
+
+ @Test
+ public void testError() {
+ testTransact(TestChannel.Mode.THROW_ERROR, TestError.class);
+ }
+
+ @Test
+ public void testRuntimeException() {
+ testTransact(TestChannel.Mode.THROW_RUNTIME, TestRuntimeException.class);
+ }
+
+ @Test
+ public void testChannelException() {
+ testTransact(TestChannel.Mode.THROW_CHANNEL, ChannelException.class);
+ }
+
+ @Test
+ public void testInterrupt() throws Exception {
+ testTransact(TestChannel.Mode.SLEEP, InterruptedException.class,
+ new Runnable() {
+ public void run() {
+ interruptTest(new Runnable() {
+ public void run() {
+ channel.put(events.get(0));
+ }
+ });
+ }
+ });
+ }
+}
Propchange: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestChannelUtils.java
------------------------------------------------------------------------------
svn:eol-style = native