You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2011/10/12 18:53:32 UTC

svn commit: r1182477 - in /incubator/flume/branches/flume-728/flume-ng-core/src: main/java/org/apache/flume/channel/MultiOpMemChannel.java test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java

Author: arvind
Date: Wed Oct 12 16:53:32 2011
New Revision: 1182477

URL: http://svn.apache.org/viewvc?rev=1182477&view=rev
Log:
SQOOP-722. MemoryChannel should push events back into channel on rollback.

(Prasad Mujumdar via Arvind Prbhakar)

Added:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java

Added: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java?rev=1182477&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiOpMemChannel.java Wed Oct 12 16:53:32 2011
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel;
+
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.LinkedList;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+//import org.apache.flume.channel.MultiOpMemChannel.MemTransaction;
+import org.apache.flume.conf.Configurable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Memory channel that with full transaction support
+ * Uses transaction object for each thread (source and sink) attached to channel.
+ * The events are stored in the thread safe Dequeue.  * The put and take are directly 
+ * executed in the common queue. Channel has a marker for the last committed event in
+ * order to avoid sink reading uncommitted data.
+ * The transactions keep track of the actions to perform undo when rolled back.
+ *
+ */
+public class MultiOpMemChannel implements Channel, Configurable {
+
+  private static final Integer defaultCapacity = 50;
+
+  // wrap the event with a counter
+  private class StampedEvent {
+    int timeStamp;
+    Event event;
+
+    public StampedEvent (int stamp, Event E) {
+      timeStamp = stamp;
+      event = E;
+    }
+
+    public int getStamp() {
+      return timeStamp;
+    }
+
+    public Event getEvent() {
+      return event;
+    }
+
+  }
+
+  /* transaction class
+   * maintain a 'undo' list for put/take from the queue. 
+   * The rollback performs undo of the operations using these lists.
+   * Also maintain a stamp/counter for commit and last take.
+   * This is used to ensure that a transaction doesn't read
+   * uncommitted events. 
+   */
+  public class MemTransaction implements Transaction {
+    private int putStamp;
+    private int takeStamp;
+    private LinkedList<StampedEvent> undoTakeList;
+    private LinkedList<StampedEvent> undoPutList;
+    private TransactionState txnState;
+
+    public MemTransaction () {
+      txnState = TransactionState.Closed;
+    }
+
+    @Override
+    /** 
+     * Start the transaction 
+     *  initialize the undo lists, stamps
+     *  set transaction state to Started
+     */
+    public void begin() {
+      undoTakeList = new LinkedList<StampedEvent>();
+      undoPutList = new LinkedList<StampedEvent> ();
+      putStamp = 0;
+      takeStamp = 0;
+      txnState = TransactionState.Started;
+    }
+
+    @Override
+    /**
+     * Commit the transaction
+     *  If there was an event added by this transaction, then set the commit stamp
+     *  set transaction state to Committed
+     */
+    public void commit() {
+      Preconditions.checkArgument(txnState == TransactionState.Started, 
+          "transaction not started");
+      // if the txn put any events, 
+      if (putStamp != 0) {
+         lastCommitStamp.set(putStamp);
+      }
+      txnState = TransactionState.Committed;
+    }
+
+    @Override
+    /**
+     * Rollback the transaction
+     *  execute the channel's undoXXX to undo the actions done by this txn
+     *  set transaction state to rolled back
+     */
+    public void rollback() {
+      Preconditions.checkArgument(txnState == TransactionState.Started, 
+          "transaction not started");
+      undoPut(this);
+      undoTake(this);
+      txnState = TransactionState.RolledBack;
+    }
+
+    @Override
+    /** 
+     * Close the transaction
+     *  if the transaction is still open, then roll it back
+     *  set transaction state to Closed
+     */
+    public void close() {
+      if (txnState == TransactionState.Started) {
+        rollback();
+      }
+      txnState = TransactionState.Closed;
+      forgetTransaction(this);
+    }
+
+    protected LinkedList<StampedEvent> getUndoTakeList() {
+      return undoTakeList;
+    }
+    
+    protected LinkedList<StampedEvent> getUndoPutList() {
+      return undoPutList;
+    }
+
+    protected void setPutStamp(int stamp) {
+      putStamp = stamp;
+    }
+    
+    protected void setLastTakeStamp(int stamp) {
+      takeStamp = stamp;
+    }
+    
+    protected int lastTakeStamp() {
+      return takeStamp;
+    }
+
+  }
+
+  // The main event queue
+  private LinkedBlockingDeque<StampedEvent> queue;
+  
+  private AtomicInteger currentStamp; // operation counter
+  private AtomicInteger lastCommitStamp; // counter for the last commit
+  private ConcurrentHashMap<Long, MemTransaction> txnMap; // open transactions
+ 
+  /** 
+   * Channel constructor
+   */
+  public MultiOpMemChannel() {
+    currentStamp = new AtomicInteger(1);
+    lastCommitStamp = new AtomicInteger(0);
+    txnMap = new ConcurrentHashMap<Long, MemTransaction>();
+  }
+
+  /** 
+   * set the event queue capacity
+   */
+  @Override
+  public void configure(Context context) {
+    Integer capacity = context.get("capacity", Integer.class);
+
+    if (capacity == null) {
+      capacity = defaultCapacity;
+    }
+    queue = new LinkedBlockingDeque<StampedEvent>(capacity);
+  }
+  
+  @Override
+  /** 
+   * Add the given event to the end of the queue
+   * save the event in the undoPut queue for possible rollback
+   * save the stamp of this put for commit
+   */
+  public void put(Event event) {
+    Preconditions.checkState(queue != null,
+        "No queue defined (Did you forget to configure me?");
+
+    try {
+      MemTransaction myTxn = findTransaction();
+      
+      Preconditions.checkNotNull(myTxn, "transaction not started");
+      int myStamp = currentStamp.getAndIncrement();
+      StampedEvent stampedEvent = new StampedEvent(myStamp, event);
+      queue.put(stampedEvent);
+      myTxn.getUndoPutList().addLast(stampedEvent);
+      myTxn.setPutStamp(myStamp);
+      
+    } catch (InterruptedException ex) {
+      throw new ChannelException("Failed to put(" + event + ")", ex);
+    }
+  }
+
+  /**
+   * undo of put
+   *  for all the events in the undoPut queue, remove those from the event queue
+   * @param myTxn
+   */
+  protected void undoPut(MemTransaction myTxn ) {
+    StampedEvent undoEvent;
+    StampedEvent currentEvent;
+
+    while ((undoEvent = myTxn.getUndoPutList().removeLast()) != null) {
+      currentEvent = queue.removeLast();
+      Preconditions.checkNotNull(currentEvent, "Rollback error");
+      Preconditions.checkArgument(currentEvent == undoEvent ,
+          "Rollback error");
+    }
+  }
+  
+  @Override
+  /**
+   * remove the event from the top of the queue and return it
+   * also add that event to undoTake queue for possible rollback
+   */
+  public Event take() {
+    Preconditions.checkState(queue != null, "Queue not configured");
+
+    try {
+      MemTransaction myTxn = findTransaction();
+      
+      Preconditions.checkNotNull(myTxn, "transaction not started");
+      // don't go past the last committed element
+      if ((myTxn.lastTakeStamp() != lastCommitStamp.get())) {
+        StampedEvent e = queue.take();
+        myTxn.getUndoTakeList().addLast(e);
+        myTxn.setLastTakeStamp(e.getStamp());
+        return e.getEvent();
+      } else {
+        return null;
+      }
+    } catch (InterruptedException ex) {
+      throw new ChannelException("Failed to take()", ex);
+    }
+  }
+
+  /**
+   * undo of take operation
+   * for each event in the undoTake list, add it back to the event queue
+   * @param myTxn
+   */
+  protected void undoTake(MemTransaction myTxn) {
+    StampedEvent e;
+ 
+    while ((e = myTxn.getUndoTakeList().removeLast()) != null) {
+      queue.addFirst(e);
+    }
+  }
+  
+  @Override
+  /**
+   * Return the channel's transaction
+   */
+  public Transaction getTransaction() {
+    MemTransaction txn;
+
+    // check if there's already a transaction created for this thread
+    txn = findTransaction();
+    
+    // Create a new transaction
+    if (txn == null) {
+      txn = new MemTransaction();
+      txnMap.put(Thread.currentThread().getId(), txn);
+    }
+    return txn;
+  }
+
+  /**
+   * Remove the given transaction from the list of open transactions
+   * @param myTxn
+   */
+  protected void forgetTransaction(MemTransaction myTxn) {
+    MemTransaction currTxn = findTransaction();
+    Preconditions.checkArgument(myTxn == currTxn, "Wrong transaction to close");
+    txnMap.remove(Thread.currentThread().getId());
+  }
+  
+  // lookup the transaction for the current thread
+  protected MemTransaction findTransaction() {
+    try {
+      return txnMap.get(Thread.currentThread().getId());
+    } catch (NullPointerException eN) {
+      return null;
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public String getName() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+}
+

Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java?rev=1182477&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java Wed Oct 12 16:53:32 2011
@@ -0,0 +1,126 @@
+package org.apache.flume.channel;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMemoryChannelTransaction {
+  private Channel channel;
+
+  @Before
+  public void setUp() {
+    channel = new MultiOpMemChannel();
+  }
+
+  @Test
+  public void testCommit() throws InterruptedException, EventDeliveryException {
+    
+    Event event, event2;
+    Context context = new Context();
+    int putCounter = 0;
+
+    Configurables.configure(channel, context);
+
+    Transaction transaction = channel.getTransaction();
+    Assert.assertNotNull(transaction);
+
+    transaction.begin();
+    for (putCounter = 0; putCounter < 10; putCounter++) {
+      event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+
+    transaction = channel.getTransaction();
+    Assert.assertNotNull(transaction);
+
+    transaction.begin();
+    for (int i = 0; i < 10; i++ ) {
+      event2 = channel.take();
+      Assert.assertNotNull("lost an event", event2);
+      Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+//      System.out.println(event2.toString());
+    }
+    event2 = channel.take();
+    Assert.assertNull("extra event found", event2);
+
+    transaction.commit();
+    transaction.close();
+  }
+
+  public void testRollBack() throws InterruptedException, EventDeliveryException {
+    
+    Event event, event2;
+    Context context = new Context();
+    int putCounter = 0;
+
+    Configurables.configure(channel, context);
+
+    Transaction transaction = channel.getTransaction();
+    Assert.assertNotNull(transaction);
+
+    // add events and rollback txn
+    transaction.begin();
+    for (putCounter = 0; putCounter < 10; putCounter++) {
+      event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+      channel.put(event);
+    }
+    transaction.rollback();
+    transaction.close();
+
+    // verify that no events are stored due to rollback
+    transaction.begin();
+    event2 = channel.take();
+    Assert.assertNull("extra event found", event2);
+    transaction.commit();
+    transaction.close();
+
+    // add events and commit
+    transaction.begin();
+    for (putCounter = 0; putCounter < 10; putCounter++) {
+      event = EventBuilder.withBody(("test event" + putCounter).getBytes());
+      channel.put(event);
+    }
+    transaction.commit();
+    transaction.close();
+    
+    transaction = channel.getTransaction();
+    Assert.assertNotNull(transaction);
+
+    // verify events are there, then rollback the take
+    transaction.begin();
+    for (int i = 0; i < 10; i++ ) {
+      event2 = channel.take();
+      Assert.assertNotNull("lost an event", event2);
+      Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+    }
+    event2 = channel.take();
+    Assert.assertNull("extra event found", event2);
+
+    transaction.rollback();
+    transaction.close();
+    
+
+    // verify that the events were left in there due to rollback
+    transaction.begin();
+    for (int i = 0; i < 10; i++ ) {
+      event2 = channel.take();
+      Assert.assertNotNull("lost an event", event2);
+      Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
+    }
+    event2 = channel.take();
+    Assert.assertNull("extra event found", event2);
+
+    transaction.rollback();
+    transaction.close();
+  }
+
+}