You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by pr...@apache.org on 2012/02/24 06:17:01 UTC
svn commit: r1293084 - in /incubator/flume/branches/flume-728:
flume-ng-core/src/main/java/org/apache/flume/channel/
flume-ng-core/src/test/java/org/apache/flume/channel/
flume-ng-core/src/test/java/org/apache/flume/source/
flume-ng-node/src/test/java/...
Author: prasadm
Date: Fri Feb 24 05:17:00 2012
New Revision: 1293084
URL: http://svn.apache.org/viewvc?rev=1293084&view=rev
Log:
FLUME-936: MemoryChannel is not thread safe
(Juhani Connolly via Prasad Mujumdar)
Added:
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Fri Feb 24 05:17:00 2012
@@ -17,217 +17,159 @@
*/
package org.apache.flume.channel;
-import java.util.LinkedList;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import org.apache.flume.ChannelException;
+import javax.annotation.concurrent.GuardedBy;
+
import org.apache.flume.Context;
import org.apache.flume.Event;
-import org.apache.flume.Transaction;
+import org.apache.flume.ChannelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-/**
- * Memory channel that with full transaction support Uses transaction object for
- * each thread (source and sink) attached to channel. The events are stored in
- * the thread safe Dequeue. * The put and take are directly executed in the
- * common queue. Channel has a marker for the last committed event in order to
- * avoid sink reading uncommitted data. The transactions keep track of the
- * actions to perform undo when rolled back.
- *
- */
-public class MemoryChannel extends AbstractChannel {
-
- private static final Integer defaultCapacity = 50;
+public class MemoryChannel extends BasicChannelSemantics {
+ private static Logger LOGGER = LoggerFactory.getLogger(MemoryChannel.class);
+ private static final Integer defaultCapacity = 100;
+ private static final Integer defaultTransCapacity = 100;
private static final Integer defaultKeepAlive = 3;
- // wrap the event with a counter
- private class StampedEvent {
- private int timeStamp;
- private Event event;
-
- public StampedEvent(int stamp, Event E) {
- timeStamp = stamp;
- event = E;
- }
-
- public int getStamp() {
- return timeStamp;
+ public class MemoryTransaction extends BasicTransactionSemantics {
+ private LinkedBlockingDeque<Event> takeList;
+ private LinkedBlockingDeque<Event> putList;
+
+ public MemoryTransaction(int transCapacity) {
+ putList = new LinkedBlockingDeque<Event>(transCapacity);
+ takeList = new LinkedBlockingDeque<Event>(transCapacity);
}
- public Event getEvent() {
- return event;
- }
-
- }
-
- /*
- * transaction class maintain a 'undo' list for put/take from the queue. The
- * rollback performs undo of the operations using these lists. Also maintain a
- * stamp/counter for commit and last take. This is used to ensure that a
- * transaction doesn't read uncommitted events.
- */
- public class MemTransaction implements Transaction {
- private int putStamp;
- private int takeStamp;
- private LinkedList<StampedEvent> undoTakeList;
- private LinkedList<StampedEvent> undoPutList;
- private TransactionState txnState;
- private int refCount;
-
- public MemTransaction() {
- txnState = TransactionState.Closed;
+ @Override
+ protected void doPut(Event event) {
+ if(!putList.offer(event)) {
+ throw new ChannelException("Put queue for MemoryTransaction of capacity " +
+ putList.size() + " full, consider committing more frequently, " +
+ "increasing capacity or increasing thread count");
+ }
}
@Override
- /**
- * Start the transaction
- * initialize the undo lists, stamps
- * set transaction state to Started
- */
- public void begin() {
- if (++refCount > 1) {
- return;
- }
- undoTakeList = new LinkedList<StampedEvent>();
- undoPutList = new LinkedList<StampedEvent>();
- putStamp = 0;
- takeStamp = 0;
- txnState = TransactionState.Started;
+ protected Event doTake() throws InterruptedException {
+ if(takeList.remainingCapacity() == 0) {
+ throw new ChannelException("Take list for MemoryTransaction, capacity " +
+ takeList.size() + " full, consider committing more frequently, " +
+ "increasing capacity, or increasing thread count");
+ }
+ if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {
+ return null;
+ }
+ Event event;
+ synchronized(queueLock) {
+ event = queue.poll();
+ }
+ Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
+ "signalling existence of entry");
+ takeList.put(event);
+
+ return event;
}
@Override
- /**
- * Commit the transaction
- * If there was an event added by this transaction, then set the
- * commit stamp set transaction state to Committed
- */
- public void commit() {
- Preconditions.checkArgument(txnState == TransactionState.Started,
- "transaction not started");
- if (--refCount > 0) {
- return;
- }
-
- // if the txn put any events, then update the channel's stamp and
- // signal for availability of committed data in the queue
- if (putStamp != 0) {
- lastCommitStamp.set(putStamp);
- lock.lock();
- try {
- hasData.signal();
- } finally {
- lock.unlock();
+ protected void doCommit() throws InterruptedException {
+ int remainingChange = takeList.size() - putList.size();
+ if(remainingChange < 0) {
+ if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
+ throw new ChannelException("Space for commit to queue couldn't be acquired" +
+ " Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
- txnState = TransactionState.Committed;
- undoPutList.clear();
- undoTakeList.clear();
- }
+ int puts = putList.size();
+ synchronized(queueLock) {
+ if(puts > 0 ) {
+ while(!putList.isEmpty()) {
+ if(!queue.offer(putList.removeFirst())) {
+ throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
+ }
+ }
+ }
+ putList.clear();
+ takeList.clear();
+ }
+ queueStored.release(puts);
+ if(remainingChange > 0) {
+ queueRemaining.release(remainingChange);
+ }
- @Override
- /**
- * Rollback the transaction
- * execute the channel's undoXXX to undo the actions done by this txn
- * set transaction state to rolled back
- */
- public void rollback() {
- Preconditions.checkArgument(txnState == TransactionState.Started,
- "transaction not started");
- undoPut(this);
- undoTake(this);
- txnState = TransactionState.RolledBack;
- refCount = 0;
}
@Override
- /**
- * Close the transaction
- * if the transaction is still open, then roll it back
- * set transaction state to Closed
- */
- public void close() {
- if (txnState == TransactionState.Started) {
- rollback();
+ protected void doRollback() {
+ int takes = takeList.size();
+ synchronized(queueLock) {
+ Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
+ "queue to rollback takes. This should never happen, please report");
+ while(!takeList.isEmpty()) {
+ queue.addFirst(takeList.removeLast());
+ }
+ putList.clear();
}
- txnState = TransactionState.Closed;
- forgetTransaction(this);
+ queueStored.release(takes);
}
- public TransactionState getState() {
- return txnState;
- }
+ }
- protected int lastTakeStamp() {
- return takeStamp;
- }
+ // lock to guard queue, mainly needed to keep it locked down during resizes
+ // it should never be held through a blocking operation
+ private Integer queueLock;
- protected void logPut(StampedEvent e, int stamp) {
- undoPutList.addLast(e);
- putStamp = stamp;
- }
+ @GuardedBy(value = "queueLock")
+ private LinkedBlockingDeque<Event> queue;
- protected void logTake(StampedEvent e, int stamp) {
- undoTakeList.addLast(e);
- takeStamp = stamp;
- }
+ // invariant that tracks the amount of space remaining in the queue(with all uncommitted takeLists deducted)
+ // we maintain the remaining permits = queue.remaining - takeList.size()
+ // this allows local threads waiting for space in the queue to commit without denying access to the
+ // shared lock to threads that would make more space on the queue
+ private Semaphore queueRemaining;
+ // used to make "reservations" to grab data from the queue.
+ // by using this we can block for a while to get data without locking all other threads out
+ // like we would if we tried to use a blocking call on queue
+ private Semaphore queueStored;
+ // maximum items in a transaction queue
+ private volatile Integer transCapacity;
+ private volatile int keepAlive;
- protected StampedEvent removePut() {
- if (undoPutList.isEmpty()) {
- return null;
- } else {
- return undoPutList.removeLast();
- }
- }
- protected StampedEvent removeTake() {
- if (undoTakeList.isEmpty()) {
- return null;
- } else {
- return undoTakeList.removeLast();
- }
- }
-
- }
-
- // The main event queue
- private LinkedBlockingDeque<StampedEvent> queue;
-
- private AtomicInteger currentStamp; // operation counter
- private AtomicInteger lastCommitStamp; // counter for the last commit
- private ConcurrentHashMap<Long, MemTransaction> txnMap; // open transactions
- private Integer keepAlive;
- final Lock lock = new ReentrantLock();
- final Condition hasData = lock.newCondition();
-
- /**
- * Channel constructor
- */
public MemoryChannel() {
- currentStamp = new AtomicInteger(1);
- lastCommitStamp = new AtomicInteger(0);
- txnMap = new ConcurrentHashMap<Long, MemTransaction>();
+ super();
+ queueLock = 0;
}
- /**
- * set the event queue capacity
- */
@Override
public void configure(Context context) {
- String strCapacity = context.get("capacity", String.class);
+ String strCapacity = context.getString("capacity");
Integer capacity = null;
-
- if (strCapacity == null) {
+ if(strCapacity == null) {
capacity = defaultCapacity;
} else {
- capacity = Integer.parseInt(strCapacity);
+ try {
+ capacity = Integer.parseInt(strCapacity);
+ } catch(NumberFormatException e) {
+ capacity = defaultCapacity;
+ }
+ }
+ String strTransCapacity = context.getString("transactionCapacity");
+ if(strTransCapacity == null) {
+ transCapacity = defaultTransCapacity;
+ } else {
+ try {
+ transCapacity = Integer.parseInt(strTransCapacity);
+ } catch(NumberFormatException e) {
+ transCapacity = defaultTransCapacity;
+ }
}
+ Preconditions.checkState(transCapacity <= capacity);
String strKeepAlive = context.get("keep-alive", String.class);
@@ -237,139 +179,51 @@ public class MemoryChannel extends Abstr
keepAlive = Integer.parseInt(strKeepAlive);
}
- queue = new LinkedBlockingDeque<StampedEvent>(capacity);
- }
-
- @Override
- /**
- * Add the given event to the end of the queue
- * save the event in the undoPut queue for possible rollback
- * save the stamp of this put for commit
- */
- public void put(Event event) {
- Preconditions.checkState(queue != null,
- "No queue defined (Did you forget to configure me?");
-
- try {
- MemTransaction myTxn = findTransaction();
- Preconditions.checkState(myTxn != null, "Transaction not started");
-
- int myStamp = currentStamp.getAndIncrement();
- StampedEvent stampedEvent = new StampedEvent(myStamp, event);
- if (queue.offer(stampedEvent,keepAlive, TimeUnit.SECONDS) == false)
- throw new ChannelException("put(" + event + ") timed out");
- myTxn.logPut(stampedEvent, myStamp);
-
- } catch (InterruptedException ex) {
- throw new ChannelException("Failed to put(" + event + ")", ex);
+ if(queue != null) {
+ try {
+ resizeQueue(capacity);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ synchronized(queueLock) {
+ queue = new LinkedBlockingDeque<Event>(capacity);
+ queueRemaining = new Semaphore(capacity);
+ queueStored = new Semaphore(0);
+ }
}
}
- /**
- * undo of put for all the events in the undoPut queue, remove those from the
- * event queue
- *
- * @param myTxn
- */
- protected void undoPut(MemTransaction myTxn) {
- StampedEvent undoEvent;
- StampedEvent currentEvent;
-
- while ((undoEvent = myTxn.removePut()) != null) {
- currentEvent = queue.removeLast();
- Preconditions.checkNotNull(currentEvent, "Rollback error");
- Preconditions.checkArgument(currentEvent == undoEvent, "Rollback error");
+ private void resizeQueue(int capacity) throws InterruptedException {
+ int oldCapacity;
+ synchronized(queueLock) {
+ oldCapacity = queue.size() + queue.remainingCapacity();
}
- }
- @Override
- /**
- * remove the event from the top of the queue and return it
- * also add that event to undoTake queue for possible rollback
- */
- public Event take() {
- Preconditions.checkState(queue != null, "Queue not configured");
-
- try {
- MemTransaction myTxn = findTransaction();
- Preconditions.checkState(myTxn != null, "Transaction not started");
- Event event = null;
- int timeout = keepAlive;
-
- // wait for some committed data be there in the queue
- if ((timeout > 0) && (myTxn.lastTakeStamp() == lastCommitStamp.get())) {
- lock.lock();
- try {
- hasData.await(timeout, TimeUnit.SECONDS);
- } finally {
- lock.unlock();
+ if(oldCapacity == capacity) {
+ return;
+ } else if (oldCapacity > capacity) {
+ if(!queueRemaining.tryAcquire(oldCapacity - capacity, keepAlive, TimeUnit.SECONDS)) {
+ LOGGER.warn("Couldn't acquire permits to downsize the queue, resizing has been aborted");
+ } else {
+ synchronized(queueLock) {
+ LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
+ newQueue.addAll(queue);
+ queue = newQueue;
}
- timeout = 0; // don't wait any further
}
-
- // don't go past the last committed element
- if (myTxn.lastTakeStamp() != lastCommitStamp.get()) {
- StampedEvent e = queue.poll(timeout, TimeUnit.SECONDS);
- if (e != null) {
- myTxn.logTake(e, e.getStamp());
- event = e.getEvent();
- }
+ } else {
+ synchronized(queueLock) {
+ LinkedBlockingDeque<Event> newQueue = new LinkedBlockingDeque<Event>(capacity);
+ newQueue.addAll(queue);
+ queue = newQueue;
}
- return event;
- } catch (InterruptedException ex) {
- throw new ChannelException("Failed to take()", ex);
- }
- }
-
- /**
- * undo of take operation for each event in the undoTake list, add it back to
- * the event queue
- *
- * @param myTxn
- */
- protected void undoTake(MemTransaction myTxn) {
- StampedEvent e;
-
- while ((e = myTxn.removeTake()) != null) {
- queue.addFirst(e);
+ queueRemaining.release(capacity - oldCapacity);
}
}
@Override
- /**
- * Return the channel's transaction
- */
- public Transaction getTransaction() {
- MemTransaction txn;
-
- // check if there's already a transaction created for this thread
- txn = findTransaction();
-
- // Create a new transaction
- if (txn == null) {
- txn = new MemTransaction();
- txnMap.put(Thread.currentThread().getId(), txn);
- }
- return txn;
- }
-
- /**
- * Remove the given transaction from the list of open transactions
- *
- * @param myTxn
- */
- protected void forgetTransaction(MemTransaction myTxn) {
- MemTransaction currTxn = findTransaction();
- Preconditions.checkArgument(myTxn == currTxn, "Wrong transaction to close");
- txnMap.remove(Thread.currentThread().getId());
- }
-
- // lookup the transaction for the current thread
- protected MemTransaction findTransaction() {
- try {
- return txnMap.get(Thread.currentThread().getId());
- } catch (NullPointerException eN) {
- return null;
- }
+ protected BasicTransactionSemantics createTransaction() {
+ return new MemoryTransaction(transCapacity);
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java Fri Feb 24 05:17:00 2012
@@ -19,7 +19,11 @@
package org.apache.flume.channel;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
@@ -52,6 +56,7 @@ public class TestMemoryChannel {
transaction.begin();
channel.put(event);
transaction.commit();
+ transaction.close();
transaction = channel.getTransaction();
Assert.assertNotNull(transaction);
@@ -62,4 +67,169 @@ public class TestMemoryChannel {
transaction.commit();
}
+ @Test
+ public void testChannelResize() {
+ Context context = new Context();
+ Map<String, Object> parms = new HashMap<String, Object>();
+ parms.put("capacity", "5");
+ parms.put("transactionCapacity", "5");
+ context.setParameters(parms);
+ Configurables.configure(channel, context);
+
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ for(int i=0; i < 5; i++) {
+ channel.put(EventBuilder.withBody(String.format("test event %d", i).getBytes()));
+ }
+ transaction.commit();
+ transaction.close();
+
+ /*
+ * Verify overflow semantics
+ */
+ transaction = channel.getTransaction();
+ boolean overflowed = false;
+ try {
+ transaction.begin();
+ channel.put(EventBuilder.withBody("overflow event".getBytes()));
+ transaction.commit();
+ } catch (ChannelException e) {
+ overflowed = true;
+ transaction.rollback();
+ } finally {
+ transaction.close();
+ }
+ Assert.assertTrue(overflowed);
+
+ /*
+ * Reconfigure capacity down and add another event, shouldn't result in exception
+ */
+ parms.put("capacity", "6");
+ context.setParameters(parms);
+ Configurables.configure(channel, context);
+ transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(EventBuilder.withBody("extended capacity event".getBytes()));
+ transaction.commit();
+ transaction.close();
+
+ /*
+ * Attempt to reconfigure capacity to below current entry count and verify
+ * it wasn't carried out
+ */
+ parms.put("capacity", "2");
+ parms.put("transactionCapacity", "2");
+ context.setParameters(parms);
+ Configurables.configure(channel, context);
+ for(int i=0; i < 6; i++) {
+ transaction = channel.getTransaction();
+ transaction.begin();
+ Assert.assertNotNull(channel.take());
+ transaction.commit();
+ transaction.close();
+ }
+ }
+
+ @Test(expected=ChannelException.class)
+ public void testTransactionPutCapacityOverload() {
+ Context context = new Context();
+ Map<String, Object> parms = new HashMap<String, Object>();
+ parms.put("capacity", "5");
+ parms.put("transactionCapacity", "2");
+ context.setParameters(parms);
+ Configurables.configure(channel, context);
+
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ // shouldn't be able to fit a third in the buffer
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ Assert.fail();
+ }
+
+ @Test(expected=ChannelException.class)
+ public void testCapacityOverload() {
+ Context context = new Context();
+ Map<String, Object> parms = new HashMap<String, Object>();
+ parms.put("capacity", "5");
+ parms.put("transactionCapacity", "3");
+ context.setParameters(parms);
+ Configurables.configure(channel, context);
+
+ Transaction transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ transaction.commit();
+ transaction.close();
+
+ transaction = channel.getTransaction();
+ transaction.begin();
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ // this should kill it
+ transaction.commit();
+ Assert.fail();
+ }
+
+ @Test
+ public void testBufferEmptyingAfterTakeCommit() {
+ Context context = new Context();
+ Map<String, Object> parms = new HashMap<String, Object>();
+ parms.put("capacity", "3");
+ parms.put("transactionCapacity", "3");
+ context.setParameters(parms);
+ Configurables.configure(channel, context);
+
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ tx.commit();
+ tx.close();
+
+ tx = channel.getTransaction();
+ tx.begin();
+ channel.take();
+ channel.take();
+ tx.commit();
+ tx.close();
+
+ tx = channel.getTransaction();
+ tx.begin();
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ tx.commit();
+ tx.close();
+ }
+
+ @Test
+ public void testBufferEmptyingAfterRollback() {
+ Context context = new Context();
+ Map<String, Object> parms = new HashMap<String, Object>();
+ parms.put("capacity", "3");
+ parms.put("transactionCapacity", "3");
+ context.setParameters(parms);
+ Configurables.configure(channel, context);
+
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ tx.rollback();
+ tx.close();
+
+ tx = channel.getTransaction();
+ tx.begin();
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ tx.commit();
+ tx.close();
+ }
}
Added: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java?rev=1293084&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java Fri Feb 24 05:17:00 2012
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.channel;
+
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMemoryChannelConcurrency {
+
+ private CyclicBarrier barrier;
+
+ @Before
+ public void setUp() {
+ }
+
+ @Test
+ public void testTransactionConcurrency() throws InterruptedException {
+ final Channel channel = new MemoryChannel();
+ barrier = new CyclicBarrier(2);
+
+ Configurables.configure(channel, new Context());
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ channel.put(EventBuilder.withBody("first event".getBytes()));
+ try {
+ barrier.await();
+ barrier.await();
+ tx.rollback();
+
+ barrier.await();
+ tx.close();
+ // final barrier to make sure both threads manage to finish
+ barrier.await();
+ } catch (InterruptedException e) {
+ Assert.fail();
+ } catch (BrokenBarrierException e) {
+ Assert.fail();
+ }
+ }
+ });
+
+ Thread t2 = new Thread(new Runnable() {
+ public void run() {
+ Transaction tx = channel.getTransaction();
+ try {
+ barrier.await();
+ tx.begin();
+ channel.put(EventBuilder.withBody("second event".getBytes()));
+ barrier.await();
+ barrier.await();
+ tx.commit();
+ tx.close();
+ // final barrier to make sure both threads manage to finish
+ barrier.await();
+ } catch (InterruptedException e) {
+ Assert.fail();
+ } catch (BrokenBarrierException e) {
+ Assert.fail();
+ }
+ }
+ });
+ t1.start();
+ t2.start();
+
+ t1.join(1000);
+ if (t1.isAlive()) {
+ Assert.fail("Thread1 failed to finish");
+ t1.interrupt();
+ }
+
+ t2.join(1000);
+ if (t2.isAlive()) {
+ Assert.fail("Thread2 failed to finish");
+ t2.interrupt();
+ }
+
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ Event e = channel.take();
+ Assert.assertEquals("second event", new String(e.getBody()));
+ Assert.assertNull(channel.take());
+ tx.commit();
+ tx.close();
+ }
+
+ /**
+ * Works with a startgate/endgate latches to make sure all threads run at the same time. Threads randomly
+ * choose to commit or rollback random numbers of actions, tagging them with the thread no.
+ * The correctness check is made by recording committed entries into a map, and verifying the count
+ * after the endgate
+ * Since nothing is taking the puts out, allow for a big capacity
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testManyThreads() throws InterruptedException {
+ final Channel channel = new MemoryChannel();
+ Context context = new Context();
+ context.put("keep-alive", "1");
+ context.put("capacity", "5000"); // theoretical maximum of 100 threads * 10 * 5
+ // because we're just grabbing the whole lot in one commit
+ // normally a transactionCapacity significantly lower than the channel capacity would be recommended
+ context.put("transactionCapacity", "5000");
+ Configurables.configure(channel, context);
+ final ConcurrentHashMap<String, AtomicInteger> committedPuts =
+ new ConcurrentHashMap<String, AtomicInteger>();
+
+ final int threadCount = 100;
+ final CountDownLatch startGate = new CountDownLatch(1);
+ final CountDownLatch endGate = new CountDownLatch(threadCount);
+
+ for (int i = 0; i < threadCount; i++) {
+ Thread t = new Thread() {
+ public void run() {
+ Long tid = Thread.currentThread().getId();
+ String strtid = tid.toString();
+ Random rng = new Random(tid);
+
+ try {
+ startGate.await();
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ for(int j = 0; j < 10; j++) {
+ int events = rng.nextInt(5) + 1;
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for(int k = 0; k < events; k++) {
+ channel.put(EventBuilder.withBody(strtid.getBytes()));
+ }
+ if(rng.nextBoolean()) {
+ tx.commit();
+ AtomicInteger tcount = committedPuts.get(strtid);
+ if(tcount == null) {
+ committedPuts.put(strtid, new AtomicInteger(events));
+ } else {
+ tcount.addAndGet(events);
+ }
+ } else {
+ tx.rollback();
+ }
+ tx.close();
+ }
+ endGate.countDown();
+ }
+ };
+ t.start();
+ }
+ startGate.countDown();
+ endGate.await();
+
+ if(committedPuts.isEmpty()) {
+ Assert.fail();
+ }
+
+ // verify the counts
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ Event e;
+ while((e = channel.take()) != null) {
+ String index = new String(e.getBody());
+ AtomicInteger remain = committedPuts.get(index);
+ int post = remain.decrementAndGet();
+ if(post == 0) {
+ committedPuts.remove(index);
+ }
+ }
+ tx.commit();
+ tx.close();
+ if(!committedPuts.isEmpty()) {
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testConcurrentSinksAndSources() throws InterruptedException {
+ final Channel channel = new MemoryChannel();
+ Context context = new Context();
+ context.put("keep-alive", "1");
+ context.put("capacity", "100"); // theoretical maximum of 100 threads * 10 * 5
+ // because we're just grabbing the whole lot in one commit
+ // normally a transactionCapacity significantly lower than the channel capacity would be recommended
+ context.put("transactionCapacity", "100");
+ Configurables.configure(channel, context);
+ final ConcurrentHashMap<String, AtomicInteger> committedPuts = new ConcurrentHashMap<String, AtomicInteger>();
+ final ConcurrentHashMap<String, AtomicInteger> committedTakes =
+ new ConcurrentHashMap<String, AtomicInteger>();
+
+ final int threadCount = 100;
+ final CountDownLatch startGate = new CountDownLatch(1);
+ final CountDownLatch endGate = new CountDownLatch(threadCount);
+
+ // start a sink and source for each
+ for (int i = 0; i < threadCount/2; i++) {
+ Thread t = new Thread() {
+ public void run() {
+ Long tid = Thread.currentThread().getId();
+ String strtid = tid.toString();
+ Random rng = new Random(tid);
+
+ try {
+ startGate.await();
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ for(int j = 0; j < 10; j++) {
+ int events = rng.nextInt(5) + 1;
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ for(int k = 0; k < events; k++) {
+ channel.put(EventBuilder.withBody(strtid.getBytes()));
+ }
+ if(rng.nextBoolean()) {
+ try {
+ tx.commit();
+ AtomicInteger tcount = committedPuts.get(strtid);
+ if(tcount == null) {
+ committedPuts.put(strtid, new AtomicInteger(events));
+ } else {
+ tcount.addAndGet(events);
+ }
+ } catch(ChannelException e) {
+ System.out.print("puts commit failed");
+ tx.rollback();
+ }
+ } else {
+ tx.rollback();
+ }
+ tx.close();
+ }
+ endGate.countDown();
+ }
+ };
+ // start source
+ t.start();
+ final Integer takeMapLock = 0;
+ t = new Thread() {
+ public void run() {
+ Random rng = new Random(Thread.currentThread().getId());
+
+ try {
+ startGate.await();
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ for(int j = 0; j < 10; j++) {
+ int events = rng.nextInt(5) + 1;
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ Event[] taken = new Event[events];
+ int k;
+ for(k = 0; k < events; k++) {
+ taken[k] = channel.take();
+ if(taken[k] == null) break;
+ }
+ if(rng.nextBoolean()) {
+ try {
+ tx.commit();
+ for(Event e : taken) {
+ if(e == null) break;
+ String index = new String(e.getBody());
+ synchronized(takeMapLock) {
+ AtomicInteger remain = committedTakes.get(index);
+ if(remain == null) {
+ committedTakes.put(index, new AtomicInteger(1));
+ } else {
+ remain.incrementAndGet();
+ }
+ }
+ }
+ } catch (ChannelException e) {
+ System.out.print("takes commit failed");
+ tx.rollback();
+ }
+ } else {
+ tx.rollback();
+ }
+ tx.close();
+ }
+ endGate.countDown();
+ }
+ };
+ // start sink
+ t.start();
+ }
+ startGate.countDown();
+ if(!endGate.await(20, TimeUnit.SECONDS)) {
+ Assert.fail("Not all threads ended succesfully");
+ }
+
+ // verify the counts
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ Event e;
+ // first pull out what's left in the channel and remove it from the
+ // committed map
+ while((e = channel.take()) != null) {
+ String index = new String(e.getBody());
+ AtomicInteger remain = committedPuts.get(index);
+ int post = remain.decrementAndGet();
+ if(post == 0) {
+ committedPuts.remove(index);
+ }
+ }
+ tx.commit();
+ tx.close();
+
+ // now just check the committed puts match the committed takes
+ for(Entry<String, AtomicInteger> takes : committedTakes.entrySet()) {
+ AtomicInteger count = committedPuts.get(takes.getKey());
+ if(count == null)
+ Assert.fail("Putted data doesn't exist");
+ if(count.get() != takes.getValue().get())
+ Assert.fail(String.format("Mismatched put and take counts expected %d had %d", count.get(), takes.getValue().get()));
+ committedPuts.remove(takes.getKey());
+ }
+ if(!committedPuts.isEmpty()) Assert.fail("Puts still has entries remaining");
+ }
+}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java Fri Feb 24 05:17:00 2012
@@ -25,11 +25,12 @@ import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.Transaction.TransactionState;
-import org.apache.flume.channel.MemoryChannel.MemTransaction;
+import org.apache.flume.channel.MemoryChannel.MemoryTransaction;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class TestMemoryChannelTransaction {
@@ -49,6 +50,8 @@ public class TestMemoryChannelTransactio
int putCounter = 0;
context.put("keep-alive", "1");
+ context.put("capacity", "100");
+ context.put("transactionCapacity", "50");
Configurables.configure(channel, context);
Transaction transaction = channel.getTransaction();
@@ -152,6 +155,7 @@ public class TestMemoryChannelTransactio
transaction.close();
}
+ @Ignore("BasicChannelSemantics doesn't support re-entrant transactions")
@Test
public void testReEntTxn() throws InterruptedException,
EventDeliveryException {
@@ -172,12 +176,8 @@ public class TestMemoryChannelTransactio
event = EventBuilder.withBody(("test event" + putCounter).getBytes());
channel.put(event);
transaction.commit(); // inner commit
- Assert.assertEquals(((MemTransaction) transaction).getState(),
- TransactionState.Started);
}
transaction.commit();
- Assert.assertEquals(((MemTransaction) transaction).getState(),
- TransactionState.Committed);
transaction.close();
transaction = channel.getTransaction();
@@ -197,6 +197,7 @@ public class TestMemoryChannelTransactio
transaction.close();
}
+ @Ignore("BasicChannelSemantics doesn't support re-entrant transactions")
@Test
public void testReEntTxnRollBack() throws InterruptedException,
EventDeliveryException {
@@ -248,15 +249,11 @@ public class TestMemoryChannelTransactio
Assert.assertNotNull("lost an event", event2);
Assert.assertArrayEquals(event2.getBody(), ("test event" + i).getBytes());
transaction.commit(); // inner commit
- Assert.assertEquals(((MemTransaction) transaction).getState(),
- TransactionState.Started);
}
event2 = channel.take();
Assert.assertNull("extra event found", event2);
transaction.rollback();
- Assert.assertEquals(((MemTransaction) transaction).getState(),
- TransactionState.RolledBack);
transaction.close();
// verify that the events were left in there due to rollback
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java Fri Feb 24 05:17:00 2012
@@ -58,6 +58,9 @@ public class TestExecSource {
Context context = new Context();
context.put("command", "cat /etc/passwd");
+ context.put("keep-alive", "1");
+ context.put("capacity", "1000");
+ context.put("transactionCapacity", "1000");
Configurables.configure(source, context);
Configurables.configure(channel, context);
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1293084&r1=1293083&r2=1293084&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Fri Feb 24 05:17:00 2012
@@ -57,7 +57,6 @@ public class TestNetcatSource {
source = new NetcatSource();
Context context = new Context();
- context.put("capacity", "50");
Configurables.configure(channel, context);
List<Channel> channels = new ArrayList<Channel>();