You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/20 03:47:07 UTC

svn commit: r1186599 - /incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java

Author: esammer
Date: Thu Oct 20 01:47:07 2011
New Revision: 1186599

URL: http://svn.apache.org/viewvc?rev=1186599&view=rev
Log:
FLUME-806: Fix cast exception in MemoryChannel due to config type changes

- Reformatted to fit with existing standards (with apologies to Prasad).

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1186599&r1=1186598&r2=1186599&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Thu Oct 20 01:47:07 2011
@@ -17,33 +17,32 @@
  */
 package org.apache.flume.channel;
 
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.LinkedList;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
-//import org.apache.flume.channel.MemoryChannel.MemTransaction;
 import org.apache.flume.conf.Configurable;
 
 import com.google.common.base.Preconditions;
 
 /**
- * Memory channel that with full transaction support
- * Uses transaction object for each thread (source and sink) attached to channel.
- * The events are stored in the thread safe Dequeue.  * The put and take are directly 
- * executed in the common queue. Channel has a marker for the last committed event in
- * order to avoid sink reading uncommitted data.
- * The transactions keep track of the actions to perform undo when rolled back.
- *
+ * Memory channel that with full transaction support Uses transaction object for
+ * each thread (source and sink) attached to channel. The events are stored in
+ * the thread safe Dequeue. * The put and take are directly executed in the
+ * common queue. Channel has a marker for the last committed event in order to
+ * avoid sink reading uncommitted data. The transactions keep track of the
+ * actions to perform undo when rolled back.
+ * 
  */
 public class MemoryChannel implements Channel, Configurable {
 
@@ -55,7 +54,7 @@ public class MemoryChannel implements Ch
     private int timeStamp;
     private Event event;
 
-    public StampedEvent (int stamp, Event E) {
+    public StampedEvent(int stamp, Event E) {
       timeStamp = stamp;
       event = E;
     }
@@ -70,12 +69,11 @@ public class MemoryChannel implements Ch
 
   }
 
-  /* transaction class
-   * maintain a 'undo' list for put/take from the queue. 
-   * The rollback performs undo of the operations using these lists.
-   * Also maintain a stamp/counter for commit and last take.
-   * This is used to ensure that a transaction doesn't read
-   * uncommitted events. 
+  /*
+   * transaction class maintain a 'undo' list for put/take from the queue. The
+   * rollback performs undo of the operations using these lists. Also maintain a
+   * stamp/counter for commit and last take. This is used to ensure that a
+   * transaction doesn't read uncommitted events.
    */
   public class MemTransaction implements Transaction {
     private int putStamp;
@@ -84,7 +82,7 @@ public class MemoryChannel implements Ch
     private LinkedList<StampedEvent> undoPutList;
     private TransactionState txnState;
 
-    public MemTransaction () {
+    public MemTransaction() {
       txnState = TransactionState.Closed;
     }
 
@@ -96,7 +94,7 @@ public class MemoryChannel implements Ch
      */
     public void begin() {
       undoTakeList = new LinkedList<StampedEvent>();
-      undoPutList = new LinkedList<StampedEvent> ();
+      undoPutList = new LinkedList<StampedEvent>();
       putStamp = 0;
       takeStamp = 0;
       txnState = TransactionState.Started;
@@ -109,15 +107,15 @@ public class MemoryChannel implements Ch
      *  commit stamp set transaction state to Committed
      */
     public void commit() {
-      Preconditions.checkArgument(txnState == TransactionState.Started, 
+      Preconditions.checkArgument(txnState == TransactionState.Started,
           "transaction not started");
       // if the txn put any events, then update the channel's stamp and
       // signal for availability of committed data in the queue
       if (putStamp != 0) {
-         lastCommitStamp.set(putStamp);
-         lock.lock();
-         hasData.signal();
-         lock.unlock();
+        lastCommitStamp.set(putStamp);
+        lock.lock();
+        hasData.signal();
+        lock.unlock();
       }
       txnState = TransactionState.Committed;
       undoPutList.clear();
@@ -131,7 +129,7 @@ public class MemoryChannel implements Ch
      *  set transaction state to rolled back
      */
     public void rollback() {
-      Preconditions.checkArgument(txnState == TransactionState.Started, 
+      Preconditions.checkArgument(txnState == TransactionState.Started,
           "transaction not started");
       undoPut(this);
       undoTake(this);
@@ -155,7 +153,7 @@ public class MemoryChannel implements Ch
     protected int lastTakeStamp() {
       return takeStamp;
     }
-    
+
     protected void logPut(StampedEvent e, int stamp) {
       undoPutList.addLast(e);
       putStamp = stamp;
@@ -165,7 +163,7 @@ public class MemoryChannel implements Ch
       undoTakeList.addLast(e);
       takeStamp = stamp;
     }
-    
+
     protected StampedEvent removePut() {
       if (undoPutList.isEmpty()) {
         return null;
@@ -186,15 +184,15 @@ public class MemoryChannel implements Ch
 
   // The main event queue
   private LinkedBlockingDeque<StampedEvent> queue;
-  
+
   private AtomicInteger currentStamp; // operation counter
   private AtomicInteger lastCommitStamp; // counter for the last commit
   private ConcurrentHashMap<Long, MemTransaction> txnMap; // open transactions
   private Integer keepAlive;
   final Lock lock = new ReentrantLock();
-  final Condition hasData  = lock.newCondition(); 
+  final Condition hasData = lock.newCondition();
 
-  /** 
+  /**
    * Channel constructor
    */
   public MemoryChannel() {
@@ -203,25 +201,29 @@ public class MemoryChannel implements Ch
     txnMap = new ConcurrentHashMap<Long, MemTransaction>();
   }
 
-  /** 
+  /**
    * set the event queue capacity
    */
   @Override
   public void configure(Context context) {
+    String strCapacity = context.get("capacity", String.class);
+    Integer capacity = null;
 
-    Integer capacity = context.get("capacity", Integer.class);
-    if (capacity == null) {
+    if (strCapacity == null) {
       capacity = defaultCapacity;
+    } else {
+      capacity = Integer.parseInt(strCapacity);
     }
 
     keepAlive = context.get("keep-alive", Integer.class);
+
     if (keepAlive == null) {
       keepAlive = defaultKeepAlive;
     }
 
     queue = new LinkedBlockingDeque<StampedEvent>(capacity);
   }
-  
+
   @Override
   /** 
    * Add the given event to the end of the queue
@@ -240,29 +242,29 @@ public class MemoryChannel implements Ch
       StampedEvent stampedEvent = new StampedEvent(myStamp, event);
       queue.put(stampedEvent);
       myTxn.logPut(stampedEvent, myStamp);
-      
+
     } catch (InterruptedException ex) {
       throw new ChannelException("Failed to put(" + event + ")", ex);
     }
   }
 
   /**
-   * undo of put
-   *  for all the events in the undoPut queue, remove those from the event queue
+   * undo of put for all the events in the undoPut queue, remove those from the
+   * event queue
+   * 
    * @param myTxn
    */
-  protected void undoPut(MemTransaction myTxn ) {
+  protected void undoPut(MemTransaction myTxn) {
     StampedEvent undoEvent;
     StampedEvent currentEvent;
 
     while ((undoEvent = myTxn.removePut()) != null) {
       currentEvent = queue.removeLast();
       Preconditions.checkNotNull(currentEvent, "Rollback error");
-      Preconditions.checkArgument(currentEvent == undoEvent ,
-          "Rollback error");
+      Preconditions.checkArgument(currentEvent == undoEvent, "Rollback error");
     }
   }
-  
+
   @Override
   /**
    * remove the event from the top of the queue and return it
@@ -273,7 +275,7 @@ public class MemoryChannel implements Ch
 
     try {
       MemTransaction myTxn = findTransaction();
-      Preconditions.checkState(myTxn != null, "Transaction not started");     
+      Preconditions.checkState(myTxn != null, "Transaction not started");
       Event event = null;
       int timeout = keepAlive;
 
@@ -289,9 +291,9 @@ public class MemoryChannel implements Ch
       if (myTxn.lastTakeStamp() != lastCommitStamp.get()) {
         StampedEvent e = queue.poll(timeout, TimeUnit.SECONDS);
         if (e != null) {
-           myTxn.logTake(e, e.getStamp());
-           event = e.getEvent();           
-         }
+          myTxn.logTake(e, e.getStamp());
+          event = e.getEvent();
+        }
       }
       return event;
     } catch (InterruptedException ex) {
@@ -300,8 +302,9 @@ public class MemoryChannel implements Ch
   }
 
   /**
-   * undo of take operation
-   * for each event in the undoTake list, add it back to the event queue
+   * undo of take operation for each event in the undoTake list, add it back to
+   * the event queue
+   * 
    * @param myTxn
    */
   protected void undoTake(MemTransaction myTxn) {
@@ -311,7 +314,7 @@ public class MemoryChannel implements Ch
       queue.addFirst(e);
     }
   }
-  
+
   @Override
   /**
    * Return the channel's transaction
@@ -321,7 +324,7 @@ public class MemoryChannel implements Ch
 
     // check if there's already a transaction created for this thread
     txn = findTransaction();
-    
+
     // Create a new transaction
     if (txn == null) {
       txn = new MemTransaction();
@@ -332,6 +335,7 @@ public class MemoryChannel implements Ch
 
   /**
    * Remove the given transaction from the list of open transactions
+   * 
    * @param myTxn
    */
   protected void forgetTransaction(MemTransaction myTxn) {
@@ -339,7 +343,7 @@ public class MemoryChannel implements Ch
     Preconditions.checkArgument(myTxn == currTxn, "Wrong transaction to close");
     txnMap.remove(Thread.currentThread().getId());
   }
-  
+
   // lookup the transaction for the current thread
   protected MemTransaction findTransaction() {
     try {
@@ -361,4 +365,3 @@ public class MemoryChannel implements Ch
     return null;
   }
 }
-