You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/18 00:35:52 UTC
svn commit: r1185412 - in /incubator/flume/branches/flume-728:
flume-ng-core/src/main/java/org/apache/flume/channel/
flume-ng-core/src/test/java/org/apache/flume/channel/
flume-ng-core/src/test/java/org/apache/flume/sink/
flume-ng-core/src/test/java/or...
Author: arvind
Date: Mon Oct 17 22:35:52 2011
New Revision: 1185412
URL: http://svn.apache.org/viewvc?rev=1185412&view=rev
Log:
FLUME-722. Transactinal memory channel implementation.
(Prasad Mujumdar via Arvind Prabhakar)
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
Removed:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Mon Oct 17 22:35:52 2011
@@ -17,150 +17,335 @@
*/
package org.apache.flume.channel;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.LinkedList;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
+//import org.apache.flume.channel.MemoryChannel.MemTransaction;
import org.apache.flume.conf.Configurable;
import com.google.common.base.Preconditions;
/**
- * <p>
- * A capacity-capped {@link Channel} implementation that supports in-memory
- * buffering and delivery of events.
- * </p>
- * <p>
- * This channel is appropriate for
- * <q>best effort</q> delivery of events where high throughput is favored over
- * data durability. To be clear, <b>this channel offers absolutely no guarantee
- * of event delivery</b> in the face of (any) component failure.
- * </p>
- * <p>
- * TODO: Discuss guarantees, corner cases re: potential data loss (e.g. consumer
- * begins a tx, takes events, and gets SIGKILL before rollback).
- * </p>
- * <p>
- * <b>Configuration options</b>
- * </p>
- * <table>
- * <tr>
- * <th>Parameter</th>
- * <th>Description</th>
- * <th>Unit / Type</th>
- * <th>Default</th>
- * </tr>
- * <tr>
- * <td><tt>capacity</tt></td>
- * <td>The in-memory capacity of this channel. Store up to <tt>capacity</tt>
- * events before refusing new events.</td>
- * <td>events / int</td>
- * <td>50</td>
- * </tr>
- * <tr>
- * <td><tt>keep-alive</tt></td>
- * <td>The amount of time (seconds) to wait for an event before returning
- * <tt>null</tt> on {@link #take()}.</td>
- * <td>seconds / int</td>
- * <td>3</td>
- * </tr>
- * </table>
- * <p>
- * <b>Metrics</b>
- * </p>
- * <p>
- * TODO
- * </p>
+ * Memory channel that with full transaction support
+ * Uses transaction object for each thread (source and sink) attached to channel.
+ * The events are stored in the thread safe Dequeue. * The put and take are directly
+ * executed in the common queue. Channel has a marker for the last committed event in
+ * order to avoid sink reading uncommitted data.
+ * The transactions keep track of the actions to perform undo when rolled back.
+ *
*/
public class MemoryChannel implements Channel, Configurable {
private static final Integer defaultCapacity = 50;
private static final Integer defaultKeepAlive = 3;
- private BlockingQueue<Event> queue;
+ // wrap the event with a counter
+ private class StampedEvent {
+ private int timeStamp;
+ private Event event;
+
+ public StampedEvent (int stamp, Event E) {
+ timeStamp = stamp;
+ event = E;
+ }
+
+ public int getStamp() {
+ return timeStamp;
+ }
+
+ public Event getEvent() {
+ return event;
+ }
+
+ }
+
+ /* transaction class
+ * maintain a 'undo' list for put/take from the queue.
+ * The rollback performs undo of the operations using these lists.
+ * Also maintain a stamp/counter for commit and last take.
+ * This is used to ensure that a transaction doesn't read
+ * uncommitted events.
+ */
+ public class MemTransaction implements Transaction {
+ private int putStamp;
+ private int takeStamp;
+ private LinkedList<StampedEvent> undoTakeList;
+ private LinkedList<StampedEvent> undoPutList;
+ private TransactionState txnState;
+
+ public MemTransaction () {
+ txnState = TransactionState.Closed;
+ }
+
+ @Override
+ /**
+ * Start the transaction
+ * initialize the undo lists, stamps
+ * set transaction state to Started
+ */
+ public void begin() {
+ undoTakeList = new LinkedList<StampedEvent>();
+ undoPutList = new LinkedList<StampedEvent> ();
+ putStamp = 0;
+ takeStamp = 0;
+ txnState = TransactionState.Started;
+ }
+
+ @Override
+ /**
+ * Commit the transaction
+ * If there was an event added by this transaction, then set the
+ * commit stamp set transaction state to Committed
+ */
+ public void commit() {
+ Preconditions.checkArgument(txnState == TransactionState.Started,
+ "transaction not started");
+ // if the txn put any events, then update the channel's stamp and
+ // signal for availability of committed data in the queue
+ if (putStamp != 0) {
+ lastCommitStamp.set(putStamp);
+ lock.lock();
+ hasData.signal();
+ lock.unlock();
+ }
+ txnState = TransactionState.Committed;
+ undoPutList.clear();
+ undoTakeList.clear();
+ }
+
+ @Override
+ /**
+ * Rollback the transaction
+ * execute the channel's undoXXX to undo the actions done by this txn
+ * set transaction state to rolled back
+ */
+ public void rollback() {
+ Preconditions.checkArgument(txnState == TransactionState.Started,
+ "transaction not started");
+ undoPut(this);
+ undoTake(this);
+ txnState = TransactionState.RolledBack;
+ }
+
+ @Override
+ /**
+ * Close the transaction
+ * if the transaction is still open, then roll it back
+ * set transaction state to Closed
+ */
+ public void close() {
+ if (txnState == TransactionState.Started) {
+ rollback();
+ }
+ txnState = TransactionState.Closed;
+ forgetTransaction(this);
+ }
+
+ protected int lastTakeStamp() {
+ return takeStamp;
+ }
+
+ protected void logPut(StampedEvent e, int stamp) {
+ undoPutList.addLast(e);
+ putStamp = stamp;
+ }
+
+ protected void logTake(StampedEvent e, int stamp) {
+ undoTakeList.addLast(e);
+ takeStamp = stamp;
+ }
+
+ protected StampedEvent removePut() {
+ if (undoPutList.isEmpty()) {
+ return null;
+ } else {
+ return undoPutList.removeLast();
+ }
+ }
+
+ protected StampedEvent removeTake() {
+ if (undoTakeList.isEmpty()) {
+ return null;
+ } else {
+ return undoTakeList.removeLast();
+ }
+ }
+
+ }
+
+ // The main event queue
+ private LinkedBlockingDeque<StampedEvent> queue;
+
+ private AtomicInteger currentStamp; // operation counter
+ private AtomicInteger lastCommitStamp; // counter for the last commit
+ private ConcurrentHashMap<Long, MemTransaction> txnMap; // open transactions
private Integer keepAlive;
+ final Lock lock = new ReentrantLock();
+ final Condition hasData = lock.newCondition();
+ /**
+ * Channel constructor
+ */
+ public MemoryChannel() {
+ currentStamp = new AtomicInteger(1);
+ lastCommitStamp = new AtomicInteger(0);
+ txnMap = new ConcurrentHashMap<Long, MemTransaction>();
+ }
+
+ /**
+ * set the event queue capacity
+ */
@Override
public void configure(Context context) {
- Integer capacity = context.get("capacity", Integer.class);
- keepAlive = context.get("keep-alive", Integer.class);
+ Integer capacity = context.get("capacity", Integer.class);
if (capacity == null) {
capacity = defaultCapacity;
}
+ keepAlive = context.get("keep-alive", Integer.class);
if (keepAlive == null) {
keepAlive = defaultKeepAlive;
}
- queue = new ArrayBlockingQueue<Event>(capacity);
+ queue = new LinkedBlockingDeque<StampedEvent>(capacity);
}
-
+
@Override
+ /**
+ * Add the given event to the end of the queue
+ * save the event in the undoPut queue for possible rollback
+ * save the stamp of this put for commit
+ */
public void put(Event event) {
Preconditions.checkState(queue != null,
"No queue defined (Did you forget to configure me?");
try {
- queue.put(event);
+ MemTransaction myTxn = findTransaction();
+ Preconditions.checkState(myTxn != null, "Transaction not started");
+
+ int myStamp = currentStamp.getAndIncrement();
+ StampedEvent stampedEvent = new StampedEvent(myStamp, event);
+ queue.put(stampedEvent);
+ myTxn.logPut(stampedEvent, myStamp);
+
} catch (InterruptedException ex) {
throw new ChannelException("Failed to put(" + event + ")", ex);
}
}
+ /**
+ * undo of put
+ * for all the events in the undoPut queue, remove those from the event queue
+ * @param myTxn
+ */
+ protected void undoPut(MemTransaction myTxn ) {
+ StampedEvent undoEvent;
+ StampedEvent currentEvent;
+
+ while ((undoEvent = myTxn.removePut()) != null) {
+ currentEvent = queue.removeLast();
+ Preconditions.checkNotNull(currentEvent, "Rollback error");
+ Preconditions.checkArgument(currentEvent == undoEvent ,
+ "Rollback error");
+ }
+ }
+
@Override
+ /**
+ * remove the event from the top of the queue and return it
+ * also add that event to undoTake queue for possible rollback
+ */
public Event take() {
- Preconditions.checkState(queue != null,
- "No queue defined (Did you forget to configure me?");
+ Preconditions.checkState(queue != null, "Queue not configured");
try {
- return queue.poll(keepAlive, TimeUnit.SECONDS);
+ MemTransaction myTxn = findTransaction();
+ Preconditions.checkState(myTxn != null, "Transaction not started");
+ Event event = null;
+ int timeout = keepAlive;
+
+ // wait for some committed data be there in the queue
+ if ((timeout > 0) && (myTxn.lastTakeStamp() == lastCommitStamp.get())) {
+ lock.lock();
+ hasData.await(timeout, TimeUnit.SECONDS);
+ lock.unlock();
+ timeout = 0; // don't wait any further
+ }
+
+ // don't go past the last committed element
+ if (myTxn.lastTakeStamp() != lastCommitStamp.get()) {
+ StampedEvent e = queue.poll(timeout, TimeUnit.SECONDS);
+ if (e != null) {
+ myTxn.logTake(e, e.getStamp());
+ event = e.getEvent();
+ }
+ }
+ return event;
} catch (InterruptedException ex) {
throw new ChannelException("Failed to take()", ex);
}
}
- @Override
- public Transaction getTransaction() {
- return NoOpTransaction.sharedInstance();
- }
-
/**
- * <p>
- * A no-op transaction implementation that does nothing at all.
- * </p>
+ * undo of take operation
+ * for each event in the undoTake list, add it back to the event queue
+ * @param myTxn
*/
- public static class NoOpTransaction implements Transaction {
-
- private static NoOpTransaction sharedInstance;
-
- public static Transaction sharedInstance() {
- if (sharedInstance == null) {
- sharedInstance = new NoOpTransaction();
- }
+ protected void undoTake(MemTransaction myTxn) {
+ StampedEvent e;
- return sharedInstance;
- }
-
- @Override
- public void begin() {
- }
-
- @Override
- public void commit() {
+ while ((e = myTxn.removeTake()) != null) {
+ queue.addFirst(e);
}
+ }
+
+ @Override
+ /**
+ * Return the channel's transaction
+ */
+ public Transaction getTransaction() {
+ MemTransaction txn;
- @Override
- public void rollback() {
+ // check if there's already a transaction created for this thread
+ txn = findTransaction();
+
+ // Create a new transaction
+ if (txn == null) {
+ txn = new MemTransaction();
+ txnMap.put(Thread.currentThread().getId(), txn);
}
+ return txn;
+ }
- @Override
- public void close() {
+ /**
+ * Remove the given transaction from the list of open transactions
+ * @param myTxn
+ */
+ protected void forgetTransaction(MemTransaction myTxn) {
+ MemTransaction currTxn = findTransaction();
+ Preconditions.checkArgument(myTxn == currTxn, "Wrong transaction to close");
+ txnMap.remove(Thread.currentThread().getId());
+ }
+
+ // lookup the transaction for the current thread
+ protected MemTransaction findTransaction() {
+ try {
+ return txnMap.get(Thread.currentThread().getId());
+ } catch (NullPointerException eN) {
+ return null;
}
}
@@ -176,3 +361,4 @@ public class MemoryChannel implements Ch
return null;
}
}
+
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java?rev=1185412&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java Mon Oct 17 22:35:52 2011
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>
+ * A capacity-capped {@link Channel} implementation that supports in-memory
+ * buffering and delivery of events.
+ * </p>
+ * <p>
+ * This channel is appropriate for
+ * <q>best effort</q> delivery of events where high throughput is favored over
+ * data durability. To be clear, <b>this channel offers absolutely no guarantee
+ * of event delivery</b> in the face of (any) component failure.
+ * </p>
+ * <p>
+ * TODO: Discuss guarantees, corner cases re: potential data loss (e.g. consumer
+ * begins a tx, takes events, and gets SIGKILL before rollback).
+ * </p>
+ * <p>
+ * <b>Configuration options</b>
+ * </p>
+ * <table>
+ * <tr>
+ * <th>Parameter</th>
+ * <th>Description</th>
+ * <th>Unit / Type</th>
+ * <th>Default</th>
+ * </tr>
+ * <tr>
+ * <td><tt>capacity</tt></td>
+ * <td>The in-memory capacity of this channel. Store up to <tt>capacity</tt>
+ * events before refusing new events.</td>
+ * <td>events / int</td>
+ * <td>50</td>
+ * </tr>
+ * <tr>
+ * <td><tt>keep-alive</tt></td>
+ * <td>The amount of time (seconds) to wait for an event before returning
+ * <tt>null</tt> on {@link #take()}.</td>
+ * <td>seconds / int</td>
+ * <td>3</td>
+ * </tr>
+ * </table>
+ * <p>
+ * <b>Metrics</b>
+ * </p>
+ * <p>
+ * TODO
+ * </p>
+ */
+public class PseudoTxnMemoryChannel implements Channel, Configurable {
+
+ private static final Integer defaultCapacity = 50;
+ private static final Integer defaultKeepAlive = 3;
+
+ private BlockingQueue<Event> queue;
+ private Integer keepAlive;
+
+ @Override
+ public void configure(Context context) {
+ Integer capacity = context.get("capacity", Integer.class);
+ keepAlive = context.get("keep-alive", Integer.class);
+
+ if (capacity == null) {
+ capacity = defaultCapacity;
+ }
+
+ if (keepAlive == null) {
+ keepAlive = defaultKeepAlive;
+ }
+
+ queue = new ArrayBlockingQueue<Event>(capacity);
+ }
+
+ @Override
+ public void put(Event event) {
+ Preconditions.checkState(queue != null,
+ "No queue defined (Did you forget to configure me?");
+
+ try {
+ queue.put(event);
+ } catch (InterruptedException ex) {
+ throw new ChannelException("Failed to put(" + event + ")", ex);
+ }
+ }
+
+ @Override
+ public Event take() {
+ Preconditions.checkState(queue != null,
+ "No queue defined (Did you forget to configure me?");
+
+ try {
+ return queue.poll(keepAlive, TimeUnit.SECONDS);
+ } catch (InterruptedException ex) {
+ throw new ChannelException("Failed to take()", ex);
+ }
+ }
+
+ @Override
+ public Transaction getTransaction() {
+ return NoOpTransaction.sharedInstance();
+ }
+
+ /**
+ * <p>
+ * A no-op transaction implementation that does nothing at all.
+ * </p>
+ */
+ public static class NoOpTransaction implements Transaction {
+
+ private static NoOpTransaction sharedInstance;
+
+ public static Transaction sharedInstance() {
+ if (sharedInstance == null) {
+ sharedInstance = new NoOpTransaction();
+ }
+
+ return sharedInstance;
+ }
+
+ @Override
+ public void begin() {
+ }
+
+ @Override
+ public void commit() {
+ }
+
+ @Override
+ public void rollback() {
+ }
+
+ @Override
+ public void close() {
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java Mon Oct 17 22:35:52 2011
@@ -16,7 +16,7 @@ public class TestMemoryChannelTransactio
@Before
public void setUp() {
- channel = new MultiOpMemChannel();
+ channel = new MemoryChannel();
}
@Test
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java Mon Oct 17 22:35:52 2011
@@ -13,6 +13,7 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSink;
+import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
@@ -83,9 +84,14 @@ public class TestAvroSink {
Assert.assertTrue(LifecycleController.waitForOneOf(sink,
LifecycleState.START_OR_ERROR, 5000));
+ Transaction transaction = channel.getTransaction();
+
+ transaction.begin();
for (int i = 0; i < 10; i++) {
channel.put(event);
}
+ transaction.commit();
+ transaction.close();
for (int i = 0; i < 5; i++) {
PollableSink.Status status = sink.process();
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoggerSink.java Mon Oct 17 22:35:52 2011
@@ -4,7 +4,7 @@ import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.lifecycle.LifecycleException;
@@ -27,7 +27,7 @@ public class TestLoggerSink {
public void testAppend() throws InterruptedException, LifecycleException,
EventDeliveryException {
- Channel channel = new MemoryChannel();
+ Channel channel = new PseudoTxnMemoryChannel();
Context context = new Context();
Configurables.configure(channel, context);
Configurables.configure(sink, context);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java Mon Oct 17 22:35:52 2011
@@ -9,7 +9,7 @@ import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.lifecycle.LifecycleException;
@@ -65,7 +65,7 @@ public class TestRollingFileSink {
Configurables.configure(sink, context);
- Channel channel = new MemoryChannel();
+ Channel channel = new PseudoTxnMemoryChannel();
Configurables.configure(channel, context);
sink.setChannel(channel);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java Mon Oct 17 22:35:52 2011
@@ -10,6 +10,7 @@ import org.apache.avro.ipc.specific.Spec
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
+import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleController;
@@ -120,11 +121,17 @@ public class TestAvroSource {
Status status = client.append(avroEvent);
Assert.assertEquals(Status.OK, status);
+
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
Event event = channel.take();
Assert.assertNotNull(event);
Assert.assertEquals("Channel contained our event", "Hello avro",
new String(event.getBody()));
+ transaction.commit();
+ transaction.close();
+
logger.debug("Round trip event:{}", event);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestSequenceGeneratorSource.java Mon Oct 17 22:35:52 2011
@@ -5,7 +5,7 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
-import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.PseudoTxnMemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleException;
import org.junit.Assert;
@@ -25,7 +25,7 @@ public class TestSequenceGeneratorSource
public void testProcess() throws InterruptedException, LifecycleException,
EventDeliveryException {
- Channel channel = new MemoryChannel();
+ Channel channel = new PseudoTxnMemoryChannel();
Context context = new Context();
context.put("logicalNode.name", "test");
@@ -48,7 +48,7 @@ public class TestSequenceGeneratorSource
public void testLifecycle() throws InterruptedException,
EventDeliveryException {
- Channel channel = new MemoryChannel();
+ Channel channel = new PseudoTxnMemoryChannel();
Context context = new Context();
context.put("logicalNode.name", "test");
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Mon Oct 17 22:35:52 2011
@@ -14,6 +14,7 @@ import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.EventDrivenSource;
+import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleException;
@@ -78,6 +79,9 @@ public class TestNetcatSource {
};
+ Transaction tx = source.getChannel().getTransaction();
+ tx.begin();
+
for (int i = 0; i < 100; i++) {
executor.submit(clientRequestRunnable);
@@ -87,6 +91,8 @@ public class TestNetcatSource {
Assert.assertArrayEquals("Test message".getBytes(), event.getBody());
}
+ tx.commit();
+ tx.close();
executor.shutdown();
while (!executor.isTerminated()) {
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1185412&r1=1185411&r2=1185412&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Mon Oct 17 22:35:52 2011
@@ -30,7 +30,6 @@ import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.channel.MultiOpMemChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.lifecycle.LifecycleException;
@@ -146,12 +145,11 @@ public class TestHDFSEventSink extends T
sink.setChannel(channel);
sink.start();
- Transaction txn = channel.getTransaction();
-
Calendar eventDate = Calendar.getInstance();
// push the event batches into channel
for (i = 1; i < 4; i++) {
+ Transaction txn = channel.getTransaction();
txn.begin();
for (j = 1; j <= txnMax; j++) {
Event event = new SimpleEvent();
@@ -166,6 +164,7 @@ public class TestHDFSEventSink extends T
totalEvents++;
}
txn.commit();
+ txn.close();
// execute sink to process the events
sink.process();
@@ -235,7 +234,7 @@ public class TestHDFSEventSink extends T
Configurables.configure(sink, context);
- Channel channel = new MultiOpMemChannel();
+ Channel channel = new MemoryChannel();
Configurables.configure(channel, context);
sink.setChannel(channel);
@@ -335,12 +334,11 @@ public class TestHDFSEventSink extends T
sink.setChannel(channel);
sink.start();
- Transaction txn = channel.getTransaction();
-
Calendar eventDate = Calendar.getInstance();
// push the event batches into channel
for (int i = 1; i < 4; i++) {
+ Transaction txn = channel.getTransaction();
txn.begin();
for (int j = 1; j <= txnMax; j++) {
Event event = new SimpleEvent();
@@ -354,6 +352,7 @@ public class TestHDFSEventSink extends T
channel.put(event);
}
txn.commit();
+ txn.close();
// execute sink to process the events
sink.process();
@@ -414,7 +413,7 @@ public class TestHDFSEventSink extends T
Configurables.configure(sink, context);
- Channel channel = new MultiOpMemChannel();
+ Channel channel = new MemoryChannel();
Configurables.configure(channel, context);
sink.setChannel(channel);