You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/20 03:47:07 UTC
svn commit: r1186599 -
/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
Author: esammer
Date: Thu Oct 20 01:47:07 2011
New Revision: 1186599
URL: http://svn.apache.org/viewvc?rev=1186599&view=rev
Log:
FLUME-806: Fix cast exception in MemoryChannel due to config type changes
- Reformatted to fit with existing standards (with apologies to Prasad).
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1186599&r1=1186598&r2=1186599&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Thu Oct 20 01:47:07 2011
@@ -17,33 +17,32 @@
*/
package org.apache.flume.channel;
+import java.util.LinkedList;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.LinkedList;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
-//import org.apache.flume.channel.MemoryChannel.MemTransaction;
import org.apache.flume.conf.Configurable;
import com.google.common.base.Preconditions;
/**
- * Memory channel that with full transaction support
- * Uses transaction object for each thread (source and sink) attached to channel.
- * The events are stored in the thread safe Dequeue. * The put and take are directly
- * executed in the common queue. Channel has a marker for the last committed event in
- * order to avoid sink reading uncommitted data.
- * The transactions keep track of the actions to perform undo when rolled back.
- *
+ * Memory channel that with full transaction support Uses transaction object for
+ * each thread (source and sink) attached to channel. The events are stored in
+ * the thread safe Dequeue. * The put and take are directly executed in the
+ * common queue. Channel has a marker for the last committed event in order to
+ * avoid sink reading uncommitted data. The transactions keep track of the
+ * actions to perform undo when rolled back.
+ *
*/
public class MemoryChannel implements Channel, Configurable {
@@ -55,7 +54,7 @@ public class MemoryChannel implements Ch
private int timeStamp;
private Event event;
- public StampedEvent (int stamp, Event E) {
+ public StampedEvent(int stamp, Event E) {
timeStamp = stamp;
event = E;
}
@@ -70,12 +69,11 @@ public class MemoryChannel implements Ch
}
- /* transaction class
- * maintain a 'undo' list for put/take from the queue.
- * The rollback performs undo of the operations using these lists.
- * Also maintain a stamp/counter for commit and last take.
- * This is used to ensure that a transaction doesn't read
- * uncommitted events.
+ /*
+ * transaction class maintain a 'undo' list for put/take from the queue. The
+ * rollback performs undo of the operations using these lists. Also maintain a
+ * stamp/counter for commit and last take. This is used to ensure that a
+ * transaction doesn't read uncommitted events.
*/
public class MemTransaction implements Transaction {
private int putStamp;
@@ -84,7 +82,7 @@ public class MemoryChannel implements Ch
private LinkedList<StampedEvent> undoPutList;
private TransactionState txnState;
- public MemTransaction () {
+ public MemTransaction() {
txnState = TransactionState.Closed;
}
@@ -96,7 +94,7 @@ public class MemoryChannel implements Ch
*/
public void begin() {
undoTakeList = new LinkedList<StampedEvent>();
- undoPutList = new LinkedList<StampedEvent> ();
+ undoPutList = new LinkedList<StampedEvent>();
putStamp = 0;
takeStamp = 0;
txnState = TransactionState.Started;
@@ -109,15 +107,15 @@ public class MemoryChannel implements Ch
* commit stamp set transaction state to Committed
*/
public void commit() {
- Preconditions.checkArgument(txnState == TransactionState.Started,
+ Preconditions.checkArgument(txnState == TransactionState.Started,
"transaction not started");
// if the txn put any events, then update the channel's stamp and
// signal for availability of committed data in the queue
if (putStamp != 0) {
- lastCommitStamp.set(putStamp);
- lock.lock();
- hasData.signal();
- lock.unlock();
+ lastCommitStamp.set(putStamp);
+ lock.lock();
+ hasData.signal();
+ lock.unlock();
}
txnState = TransactionState.Committed;
undoPutList.clear();
@@ -131,7 +129,7 @@ public class MemoryChannel implements Ch
* set transaction state to rolled back
*/
public void rollback() {
- Preconditions.checkArgument(txnState == TransactionState.Started,
+ Preconditions.checkArgument(txnState == TransactionState.Started,
"transaction not started");
undoPut(this);
undoTake(this);
@@ -155,7 +153,7 @@ public class MemoryChannel implements Ch
protected int lastTakeStamp() {
return takeStamp;
}
-
+
protected void logPut(StampedEvent e, int stamp) {
undoPutList.addLast(e);
putStamp = stamp;
@@ -165,7 +163,7 @@ public class MemoryChannel implements Ch
undoTakeList.addLast(e);
takeStamp = stamp;
}
-
+
protected StampedEvent removePut() {
if (undoPutList.isEmpty()) {
return null;
@@ -186,15 +184,15 @@ public class MemoryChannel implements Ch
// The main event queue
private LinkedBlockingDeque<StampedEvent> queue;
-
+
private AtomicInteger currentStamp; // operation counter
private AtomicInteger lastCommitStamp; // counter for the last commit
private ConcurrentHashMap<Long, MemTransaction> txnMap; // open transactions
private Integer keepAlive;
final Lock lock = new ReentrantLock();
- final Condition hasData = lock.newCondition();
+ final Condition hasData = lock.newCondition();
- /**
+ /**
* Channel constructor
*/
public MemoryChannel() {
@@ -203,25 +201,29 @@ public class MemoryChannel implements Ch
txnMap = new ConcurrentHashMap<Long, MemTransaction>();
}
- /**
+ /**
* set the event queue capacity
*/
@Override
public void configure(Context context) {
+ String strCapacity = context.get("capacity", String.class);
+ Integer capacity = null;
- Integer capacity = context.get("capacity", Integer.class);
- if (capacity == null) {
+ if (strCapacity == null) {
capacity = defaultCapacity;
+ } else {
+ capacity = Integer.parseInt(strCapacity);
}
keepAlive = context.get("keep-alive", Integer.class);
+
if (keepAlive == null) {
keepAlive = defaultKeepAlive;
}
queue = new LinkedBlockingDeque<StampedEvent>(capacity);
}
-
+
@Override
/**
* Add the given event to the end of the queue
@@ -240,29 +242,29 @@ public class MemoryChannel implements Ch
StampedEvent stampedEvent = new StampedEvent(myStamp, event);
queue.put(stampedEvent);
myTxn.logPut(stampedEvent, myStamp);
-
+
} catch (InterruptedException ex) {
throw new ChannelException("Failed to put(" + event + ")", ex);
}
}
/**
- * undo of put
- * for all the events in the undoPut queue, remove those from the event queue
+ * undo of put for all the events in the undoPut queue, remove those from the
+ * event queue
+ *
* @param myTxn
*/
- protected void undoPut(MemTransaction myTxn ) {
+ protected void undoPut(MemTransaction myTxn) {
StampedEvent undoEvent;
StampedEvent currentEvent;
while ((undoEvent = myTxn.removePut()) != null) {
currentEvent = queue.removeLast();
Preconditions.checkNotNull(currentEvent, "Rollback error");
- Preconditions.checkArgument(currentEvent == undoEvent ,
- "Rollback error");
+ Preconditions.checkArgument(currentEvent == undoEvent, "Rollback error");
}
}
-
+
@Override
/**
* remove the event from the top of the queue and return it
@@ -273,7 +275,7 @@ public class MemoryChannel implements Ch
try {
MemTransaction myTxn = findTransaction();
- Preconditions.checkState(myTxn != null, "Transaction not started");
+ Preconditions.checkState(myTxn != null, "Transaction not started");
Event event = null;
int timeout = keepAlive;
@@ -289,9 +291,9 @@ public class MemoryChannel implements Ch
if (myTxn.lastTakeStamp() != lastCommitStamp.get()) {
StampedEvent e = queue.poll(timeout, TimeUnit.SECONDS);
if (e != null) {
- myTxn.logTake(e, e.getStamp());
- event = e.getEvent();
- }
+ myTxn.logTake(e, e.getStamp());
+ event = e.getEvent();
+ }
}
return event;
} catch (InterruptedException ex) {
@@ -300,8 +302,9 @@ public class MemoryChannel implements Ch
}
/**
- * undo of take operation
- * for each event in the undoTake list, add it back to the event queue
+ * undo of take operation for each event in the undoTake list, add it back to
+ * the event queue
+ *
* @param myTxn
*/
protected void undoTake(MemTransaction myTxn) {
@@ -311,7 +314,7 @@ public class MemoryChannel implements Ch
queue.addFirst(e);
}
}
-
+
@Override
/**
* Return the channel's transaction
@@ -321,7 +324,7 @@ public class MemoryChannel implements Ch
// check if there's already a transaction created for this thread
txn = findTransaction();
-
+
// Create a new transaction
if (txn == null) {
txn = new MemTransaction();
@@ -332,6 +335,7 @@ public class MemoryChannel implements Ch
/**
* Remove the given transaction from the list of open transactions
+ *
* @param myTxn
*/
protected void forgetTransaction(MemTransaction myTxn) {
@@ -339,7 +343,7 @@ public class MemoryChannel implements Ch
Preconditions.checkArgument(myTxn == currTxn, "Wrong transaction to close");
txnMap.remove(Thread.currentThread().getId());
}
-
+
// lookup the transaction for the current thread
protected MemTransaction findTransaction() {
try {
@@ -361,4 +365,3 @@ public class MemoryChannel implements Ch
return null;
}
}
-