You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/12 18:53:32 UTC
svn commit: r1182477 - in
/incubator/flume/branches/flume-728/flume-ng-core/src:
main/java/org/apache/flume/channel/MultiOpMemChannel.java
test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
Author: arvind
Date: Wed Oct 12 16:53:32 2011
New Revision: 1182477
URL: http://svn.apache.org/viewvc?rev=1182477&view=rev
Log:
SQOOP-722. MemoryChannel should push events back into channel on rollback.
(Prasad Mujumdar via Arvind Prbhakar)
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java?rev=1182477&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java Wed Oct 12 16:53:32 2011
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.LinkedList;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+//import org.apache.flume.channel.MultiOpMemChannel.MemTransaction;
+import org.apache.flume.conf.Configurable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Memory channel that with full transaction support
+ * Uses transaction object for each thread (source and sink) attached to channel.
+ * The events are stored in the thread safe Dequeue. * The put and take are directly
+ * executed in the common queue. Channel has a marker for the last committed event in
+ * order to avoid sink reading uncommitted data.
+ * The transactions keep track of the actions to perform undo when rolled back.
+ *
+ */
+public class MultiOpMemChannel implements Channel, Configurable {
+
+ private static final Integer defaultCapacity = 50;
+
+ // wrap the event with a counter
+ private class StampedEvent {
+ int timeStamp;
+ Event event;
+
+ public StampedEvent (int stamp, Event E) {
+ timeStamp = stamp;
+ event = E;
+ }
+
+ public int getStamp() {
+ return timeStamp;
+ }
+
+ public Event getEvent() {
+ return event;
+ }
+
+ }
+
+ /* transaction class
+ * maintain a 'undo' list for put/take from the queue.
+ * The rollback performs undo of the operations using these lists.
+ * Also maintain a stamp/counter for commit and last take.
+ * This is used to ensure that a transaction doesn't read
+ * uncommitted events.
+ */
+ public class MemTransaction implements Transaction {
+ private int putStamp;
+ private int takeStamp;
+ private LinkedList<StampedEvent> undoTakeList;
+ private LinkedList<StampedEvent> undoPutList;
+ private TransactionState txnState;
+
+ public MemTransaction () {
+ txnState = TransactionState.Closed;
+ }
+
+ @Override
+ /**
+ * Start the transaction
+ * initialize the undo lists, stamps
+ * set transaction state to Started
+ */
+ public void begin() {
+ undoTakeList = new LinkedList<StampedEvent>();
+ undoPutList = new LinkedList<StampedEvent> ();
+ putStamp = 0;
+ takeStamp = 0;
+ txnState = TransactionState.Started;
+ }
+
+ @Override
+ /**
+ * Commit the transaction
+ * If there was an event added by this transaction, then set the commit stamp
+ * set transaction state to Committed
+ */
+ public void commit() {
+ Preconditions.checkArgument(txnState == TransactionState.Started,
+ "transaction not started");
+ // if the txn put any events,
+ if (putStamp != 0) {
+ lastCommitStamp.set(putStamp);
+ }
+ txnState = TransactionState.Committed;
+ }
+
+ @Override
+ /**
+ * Rollback the transaction
+ * execute the channel's undoXXX to undo the actions done by this txn
+ * set transaction state to rolled back
+ */
+ public void rollback() {
+ Preconditions.checkArgument(txnState == TransactionState.Started,
+ "transaction not started");
+ undoPut(this);
+ undoTake(this);
+ txnState = TransactionState.RolledBack;
+ }
+
+ @Override
+ /**
+ * Close the transaction
+ * if the transaction is still open, then roll it back
+ * set transaction state to Closed
+ */
+ public void close() {
+ if (txnState == TransactionState.Started) {
+ rollback();
+ }
+ txnState = TransactionState.Closed;
+ forgetTransaction(this);
+ }
+
+ protected LinkedList<StampedEvent> getUndoTakeList() {
+ return undoTakeList;
+ }
+
+ protected LinkedList<StampedEvent> getUndoPutList() {
+ return undoPutList;
+ }
+
+ protected void setPutStamp(int stamp) {
+ putStamp = stamp;
+ }
+
+ protected void setLastTakeStamp(int stamp) {
+ takeStamp = stamp;
+ }
+
+ protected int lastTakeStamp() {
+ return takeStamp;
+ }
+
+ }
+
+ // The main event queue
+ private LinkedBlockingDeque<StampedEvent> queue;
+
+ private AtomicInteger currentStamp; // operation counter
+ private AtomicInteger lastCommitStamp; // counter for the last commit
+ private ConcurrentHashMap<Long, MemTransaction> txnMap; // open transactions
+
+ /**
+ * Channel constructor
+ */
+ public MultiOpMemChannel() {
+ currentStamp = new AtomicInteger(1);
+ lastCommitStamp = new AtomicInteger(0);
+ txnMap = new ConcurrentHashMap<Long, MemTransaction>();
+ }
+
+ /**
+ * set the event queue capacity
+ */
+ @Override
+ public void configure(Context context) {
+ Integer capacity = context.get("capacity", Integer.class);
+
+ if (capacity == null) {
+ capacity = defaultCapacity;
+ }
+ queue = new LinkedBlockingDeque<StampedEvent>(capacity);
+ }
+
+ @Override
+ /**
+ * Add the given event to the end of the queue
+ * save the event in the undoPut queue for possible rollback
+ * save the stamp of this put for commit
+ */
+ public void put(Event event) {
+ Preconditions.checkState(queue != null,
+ "No queue defined (Did you forget to configure me?");
+
+ try {
+ MemTransaction myTxn = findTransaction();
+
+ Preconditions.checkNotNull(myTxn, "transaction not started");
+ int myStamp = currentStamp.getAndIncrement();
+ StampedEvent stampedEvent = new StampedEvent(myStamp, event);
+ queue.put(stampedEvent);
+ myTxn.getUndoPutList().addLast(stampedEvent);
+ myTxn.setPutStamp(myStamp);
+
+ } catch (InterruptedException ex) {
+ throw new ChannelException("Failed to put(" + event + ")", ex);
+ }
+ }
+
+ /**
+ * undo of put
+ * for all the events in the undoPut queue, remove those from the event queue
+ * @param myTxn
+ */
+ protected void undoPut(MemTransaction myTxn ) {
+ StampedEvent undoEvent;
+ StampedEvent currentEvent;
+
+ while ((undoEvent = myTxn.getUndoPutList().removeLast()) != null) {
+ currentEvent = queue.removeLast();
+ Preconditions.checkNotNull(currentEvent, "Rollback error");
+ Preconditions.checkArgument(currentEvent == undoEvent ,
+ "Rollback error");
+ }
+ }
+
+ @Override
+ /**
+ * remove the event from the top of the queue and return it
+ * also add that event to undoTake queue for possible rollback
+ */
+ public Event take() {
+ Preconditions.checkState(queue != null, "Queue not configured");
+
+ try {
+ MemTransaction myTxn = findTransaction();
+
+ Preconditions.checkNotNull(myTxn, "transaction not started");
+ // don't go past the last committed element
+ if ((myTxn.lastTakeStamp() != lastCommitStamp.get())) {
+ StampedEvent e = queue.take();
+ myTxn.getUndoTakeList().addLast(e);
+ myTxn.setLastTakeStamp(e.getStamp());
+ return e.getEvent();
+ } else {
+ return null;
+ }
+ } catch (InterruptedException ex) {
+ throw new ChannelException("Failed to take()", ex);
+ }
+ }
+
+ /**
+ * undo of take operation
+ * for each event in the undoTake list, add it back to the event queue
+ * @param myTxn
+ */
+ protected void undoTake(MemTransaction myTxn) {
+ StampedEvent e;
+
+ while ((e = myTxn.getUndoTakeList().removeLast()) != null) {
+ queue.addFirst(e);
+ }
+ }
+
+ @Override
+ /**
+ * Return the channel's transaction
+ */
+ public Transaction getTransaction() {
+ MemTransaction txn;
+
+ // check if there's already a transaction created for this thread
+ txn = findTransaction();
+
+ // Create a new transaction
+ if (txn == null) {
+ txn = new MemTransaction();
+ txnMap.put(Thread.currentThread().getId(), txn);
+ }
+ return txn;
+ }
+
+ /**
+ * Remove the given transaction from the list of open transactions
+ * @param myTxn
+ */
+ protected void forgetTransaction(MemTransaction myTxn) {
+ MemTransaction currTxn = findTransaction();
+ Preconditions.checkArgument(myTxn == currTxn, "Wrong transaction to close");
+ txnMap.remove(Thread.currentThread().getId());
+ }
+
+ // lookup the transaction for the current thread
+ protected MemTransaction findTransaction() {
+ try {
+ return txnMap.get(Thread.currentThread().getId());
+ } catch (NullPointerException eN) {
+ return null;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public String getName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
+
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java?rev=1182477&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java Wed Oct 12 16:53:32 2011
@@ -0,0 +1,126 @@
+package org.apache.flume.channel;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMemoryChannelTransaction {
+ private Channel channel;
+
+ @Before
+ public void setUp() {
+ channel = new MultiOpMemChannel();
+ }
+
+ @Test
+ public void testCommit() throws InterruptedException, EventDeliveryException {
+
+ Event event, event2;
+ Context context = new Context();
+ int putCounter = 0;
+
+ Configurables.configure(channel, context);
+
+ Transaction transaction = channel.getTransaction();
+ Assert.assertNotNull(transaction);
+
+ transaction.begin();
+ for (putCounter = 0; putCounter < 10; putCounter++) {
+ event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+ channel.put(event);
+ }
+ transaction.commit();
+ transaction.close();
+
+ transaction = channel.getTransaction();
+ Assert.assertNotNull(transaction);
+
+ transaction.begin();
+ for (int i = 0; i < 10; i++ ) {
+ event2 = channel.take();
+ Assert.assertNotNull("lost an event", event2);
+ Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+// System.out.println(event2.toString());
+ }
+ event2 = channel.take();
+ Assert.assertNull("extra event found", event2);
+
+ transaction.commit();
+ transaction.close();
+ }
+
+ public void testRollBack() throws InterruptedException, EventDeliveryException {
+
+ Event event, event2;
+ Context context = new Context();
+ int putCounter = 0;
+
+ Configurables.configure(channel, context);
+
+ Transaction transaction = channel.getTransaction();
+ Assert.assertNotNull(transaction);
+
+ // add events and rollback txn
+ transaction.begin();
+ for (putCounter = 0; putCounter < 10; putCounter++) {
+ event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+ channel.put(event);
+ }
+ transaction.rollback();
+ transaction.close();
+
+ // verify that no events are stored due to rollback
+ transaction.begin();
+ event2 = channel.take();
+ Assert.assertNull("extra event found", event2);
+ transaction.commit();
+ transaction.close();
+
+ // add events and commit
+ transaction.begin();
+ for (putCounter = 0; putCounter < 10; putCounter++) {
+ event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+ channel.put(event);
+ }
+ transaction.commit();
+ transaction.close();
+
+ transaction = channel.getTransaction();
+ Assert.assertNotNull(transaction);
+
+ // verify events are there, then rollback the take
+ transaction.begin();
+ for (int i = 0; i < 10; i++ ) {
+ event2 = channel.take();
+ Assert.assertNotNull("lost an event", event2);
+ Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+ }
+ event2 = channel.take();
+ Assert.assertNull("extra event found", event2);
+
+ transaction.rollback();
+ transaction.close();
+
+
+ // verify that the events were left in there due to rollback
+ transaction.begin();
+ for (int i = 0; i < 10; i++ ) {
+ event2 = channel.take();
+ Assert.assertNotNull("lost an event", event2);
+ Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+ }
+ event2 = channel.take();
+ Assert.assertNull("extra event found", event2);
+
+ transaction.rollback();
+ transaction.close();
+ }
+
+}