You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:46:45 UTC
svn commit: r1156876 -
/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java
Author: esammer
Date: Fri Aug 12 00:46:45 2011
New Revision: 1156876
URL: http://svn.apache.org/viewvc?rev=1156876&view=rev
Log:
- Added ChannelDriver unit tests that mimic long blocking open() / close().
Modified:
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java?rev=1156876&r1=1156875&r2=1156876&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java Fri Aug 12 00:46:45 2011
@@ -534,4 +534,219 @@ public class TestChannelDriver {
sinkCounters);
}
+ @Test
+ public void testLongBlockingOpen() throws LifecycleException,
+ InterruptedException {
+
+ final EventSource source = new SequenceGeneratorSource();
+ final EventSink sink = new NullSink();
+ final CounterGroup sourceCounters = new CounterGroup();
+ final CounterGroup sinkCounters = new CounterGroup();
+
+ sourceCounters.setName("source");
+ sinkCounters.setName("sink");
+
+ driver.setSource(new EventSource() {
+
+ @Override
+ public Event<?> next(Context context) throws InterruptedException,
+ MessageDeliveryException {
+
+ sourceCounters.incrementAndGet("next");
+
+ return source.next(context);
+ }
+
+ @Override
+ public void open(Context context) throws InterruptedException,
+ LifecycleException {
+
+ Thread.sleep(4500);
+
+ sourceCounters.incrementAndGet("open");
+
+ source.open(context);
+ }
+
+ @Override
+ public void close(Context context) throws InterruptedException,
+ LifecycleException {
+
+ sourceCounters.incrementAndGet("close");
+
+ source.close(context);
+ }
+ });
+
+ driver.setSink(new EventSink() {
+
+ @Override
+ public void append(Context context, Event<?> event)
+ throws InterruptedException, MessageDeliveryException {
+
+ sinkCounters.incrementAndGet("append");
+
+ sink.append(context, event);
+ }
+
+ @Override
+ public void open(Context context) throws InterruptedException,
+ LifecycleException {
+
+ sinkCounters.incrementAndGet("open");
+
+ sink.open(context);
+ }
+
+ @Override
+ public void close(Context context) throws InterruptedException,
+ LifecycleException {
+
+ Thread.sleep(8500);
+
+ sinkCounters.incrementAndGet("close");
+
+ sink.close(context);
+ }
+ });
+
+ Context context = new Context();
+
+ driver.start(context);
+
+ boolean reached = LifecycleController.waitForOneOf(driver,
+ new LifecycleState[] { LifecycleState.START, LifecycleState.ERROR },
+ 5000);
+
+ Assert.assertTrue(reached);
+ Assert.assertEquals(LifecycleState.START, driver.getLifecycleState());
+
+ Thread.sleep(500);
+
+ driver.stop(context);
+
+ reached = LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+ LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+ Assert.assertTrue(reached);
+ Assert.assertEquals(LifecycleState.STOP, driver.getLifecycleState());
+
+ Assert.assertEquals(Long.valueOf(1), sourceCounters.get("open"));
+ Assert.assertEquals(Long.valueOf(1), sourceCounters.get("close"));
+ Assert.assertEquals(Long.valueOf(1), sinkCounters.get("open"));
+ Assert.assertEquals(Long.valueOf(1), sinkCounters.get("close"));
+ Assert.assertEquals(
+ "Source next() events do not match sink append() events",
+ sourceCounters.get("next"), sinkCounters.get("append"));
+
+ logger.info("Source counters:{} Sink counters:{}", sourceCounters,
+ sinkCounters);
+ }
+
+ @Test
+ public void testLongBlockingClose() throws LifecycleException,
+ InterruptedException {
+
+ final EventSource source = new SequenceGeneratorSource();
+ final EventSink sink = new NullSink();
+ final CounterGroup sourceCounters = new CounterGroup();
+ final CounterGroup sinkCounters = new CounterGroup();
+
+ sourceCounters.setName("source");
+ sinkCounters.setName("sink");
+
+ driver.setSource(new EventSource() {
+
+ @Override
+ public Event<?> next(Context context) throws InterruptedException,
+ MessageDeliveryException {
+
+ sourceCounters.incrementAndGet("next");
+
+ return source.next(context);
+ }
+
+ @Override
+ public void open(Context context) throws InterruptedException,
+ LifecycleException {
+
+ sourceCounters.incrementAndGet("open");
+
+ source.open(context);
+ }
+
+ @Override
+ public void close(Context context) throws InterruptedException,
+ LifecycleException {
+
+ sourceCounters.incrementAndGet("close");
+
+ source.close(context);
+ }
+ });
+
+ driver.setSink(new EventSink() {
+
+ @Override
+ public void append(Context context, Event<?> event)
+ throws InterruptedException, MessageDeliveryException {
+
+ sinkCounters.incrementAndGet("append");
+
+ sink.append(context, event);
+ }
+
+ @Override
+ public void open(Context context) throws InterruptedException,
+ LifecycleException {
+
+ sinkCounters.incrementAndGet("open");
+
+ sink.open(context);
+ }
+
+ @Override
+ public void close(Context context) throws InterruptedException,
+ LifecycleException {
+
+ Thread.sleep(8500);
+
+ sinkCounters.incrementAndGet("close");
+
+ sink.close(context);
+ }
+ });
+
+ Context context = new Context();
+
+ driver.start(context);
+
+ LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+ LifecycleState.START, LifecycleState.ERROR }, 5000);
+
+ Assert.assertEquals(LifecycleState.START, driver.getLifecycleState());
+
+ Thread.sleep(500);
+
+ driver.stop(context);
+
+ LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+ LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+ Assert.assertEquals(LifecycleState.STOP, driver.getLifecycleState());
+
+ Assert.assertEquals(Long.valueOf(1), sourceCounters.get("open"));
+ Assert.assertEquals(Long.valueOf(1), sourceCounters.get("close"));
+ Assert.assertTrue(sourceCounters.get("next") > 0);
+ Assert.assertEquals(Long.valueOf(1), sinkCounters.get("open"));
+ Assert.assertEquals(Long.valueOf(1), sinkCounters.get("close"));
+ Assert.assertTrue(sinkCounters.get("append") > 0);
+ Assert.assertEquals(
+ "Source next() events do not match sink append() events",
+ sourceCounters.get("next"), sinkCounters.get("append"));
+
+ logger.info("Source counters:{} Sink counters:{}", sourceCounters,
+ sinkCounters);
+ }
+
}