You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/12 02:46:33 UTC

svn commit: r1156871 - in /incubator/flume/branches/flume-728/flume-ng-core/src: main/java/org/apache/flume/core/ChannelDriver.java test/java/org/apache/flume/core/TestChannelDriver.java

Author: esammer
Date: Fri Aug 12 00:46:33 2011
New Revision: 1156871

URL: http://svn.apache.org/viewvc?rev=1156871&view=rev
Log:
- Fixed log messages. SLF4J doesn't do the right thing with log(string, object, throwable). - Fixed a bug (in a really gross way) where we didn't close a sink if a source failed to open. - Improved ChannelDriver unit tests to ensure the number of events from a source match those successfully delivered to the sink. - Added tests for both flakey next and append methods in the source and sink respectively.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java?rev=1156871&r1=1156870&r2=1156871&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/core/ChannelDriver.java Fri Aug 12 00:46:33 2011
@@ -145,13 +145,13 @@ public class ChannelDriver implements Li
       try {
         sink.open(context);
       } catch (InterruptedException e) {
-        logger.debug("Interrupted while opening source / sink.", e);
+        logger.debug("Interrupted while opening sink. Exception follows.", e);
         lastException = e;
         lifecycleState = LifecycleState.ERROR;
         shouldStop = true;
         return;
       } catch (LifecycleException e) {
-        logger.error("Failed to open source / sink. Exception follows.", e);
+        logger.error("Failed to open sink. Exception follows.", e);
         lastException = e;
         lifecycleState = LifecycleState.ERROR;
         shouldStop = true;
@@ -161,16 +161,41 @@ public class ChannelDriver implements Li
       try {
         source.open(context);
       } catch (InterruptedException e) {
-        logger.debug("Interrupted while opening source:{}", source, e);
+        logger.debug("Interrupted while opening source. Exception follows.", e);
         lastException = e;
         lifecycleState = LifecycleState.ERROR;
         shouldStop = true;
+
+        /* FIXME: This is gross. Factor this out. */
+        try {
+          sink.close(context);
+        } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
+        } catch (LifecycleException e1) {
+          logger
+              .error(
+                  "While cleaning up after \"{}\" failed to close the sink down - {}",
+                  e.getMessage(), e.toString());
+        }
+
         return;
       } catch (LifecycleException e) {
-        logger.error("Failed to open source:{} Exception follows.", source, e);
+        logger.error("Failed to open source. Exception follows.", e);
         lastException = e;
         lifecycleState = LifecycleState.ERROR;
         shouldStop = true;
+
+        try {
+          sink.close(context);
+        } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
+        } catch (LifecycleException e1) {
+          logger
+              .error(
+                  "While cleaning up after \"{}\" failed to close the sink down - {}",
+                  e.getMessage(), e.toString());
+        }
+
         return;
       }
 
@@ -192,7 +217,8 @@ public class ChannelDriver implements Li
           lifecycleState = LifecycleState.ERROR;
           shouldStop = true;
         } catch (MessageDeliveryException e) {
-          logger.debug("Unable to deliver event:{} (may be null)", event);
+          logger.debug("Unable to deliver event:{} (may be null) - Reason:{}",
+              event, e.getMessage());
           discardedEvents++;
           /* FIXME: Handle dead messages. */
         }
@@ -203,11 +229,11 @@ public class ChannelDriver implements Li
       try {
         source.close(context);
       } catch (InterruptedException e) {
-        logger.debug("Interrupted while closing source:{}.", source, e);
+        logger.debug("Interrupted while closing source. Exception follows.", e);
         lastException = e;
         lifecycleState = LifecycleState.ERROR;
       } catch (LifecycleException e) {
-        logger.error("Failed to close source:{} Exception follows.", source, e);
+        logger.error("Failed to close source. Exception follows.", e);
         lastException = e;
         lifecycleState = LifecycleState.ERROR;
       }
@@ -215,11 +241,11 @@ public class ChannelDriver implements Li
       try {
         sink.close(context);
       } catch (InterruptedException e) {
-        logger.debug("Interrupted while closing sink:{}.", sink, e);
+        logger.debug("Interrupted while closing sink. Exception follows.", e);
         lastException = e;
         lifecycleState = LifecycleState.ERROR;
       } catch (LifecycleException e) {
-        logger.error("Failed to close sink:{} Exception follows.", sink, e);
+        logger.error("Failed to close sink. Exception follows.", e);
         lastException = e;
         lifecycleState = LifecycleState.ERROR;
       }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java?rev=1156871&r1=1156870&r2=1156871&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/core/TestChannelDriver.java Fri Aug 12 00:46:33 2011
@@ -119,6 +119,9 @@ public class TestChannelDriver {
     Assert.assertEquals(Long.valueOf(1), sinkCounters.get("open"));
     Assert.assertEquals(Long.valueOf(1), sinkCounters.get("close"));
     Assert.assertTrue(sinkCounters.get("append") > 0);
+    Assert.assertEquals(
+        "Source next() events didn't match sink append() events",
+        sourceCounters.get("next"), sinkCounters.get("append"));
 
     logger.info("Source counters:{} Sink counters:{}", sourceCounters,
         sinkCounters);
@@ -214,6 +217,318 @@ public class TestChannelDriver {
     Assert.assertEquals(Long.valueOf(0), sinkCounters.get("open"));
     Assert.assertEquals(Long.valueOf(0), sinkCounters.get("close"));
     Assert.assertEquals(Long.valueOf(0), sinkCounters.get("append"));
+    Assert.assertEquals(
+        "Source next() events do not match sink append() events",
+        sourceCounters.get("next"), sinkCounters.get("append"));
+
+    logger.info("Source counters:{} Sink counters:{}", sourceCounters,
+        sinkCounters);
+  }
+
+  @Test
+  public void testFailedClose() throws LifecycleException, InterruptedException {
+    final EventSource source = new SequenceGeneratorSource();
+    final EventSink sink = new NullSink();
+    final CounterGroup sourceCounters = new CounterGroup();
+    final CounterGroup sinkCounters = new CounterGroup();
+
+    sourceCounters.setName("source");
+    sinkCounters.setName("sink");
+
+    driver.setSource(new EventSource() {
+
+      @Override
+      public Event<?> next(Context context) throws InterruptedException,
+          MessageDeliveryException {
+
+        sourceCounters.incrementAndGet("next");
+
+        return source.next(context);
+      }
+
+      @Override
+      public void open(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sourceCounters.incrementAndGet("open");
+
+        source.open(context);
+      }
+
+      @Override
+      public void close(Context context) throws InterruptedException,
+          LifecycleException {
+
+        throw new LifecycleException("Close failed because I said so!");
+      }
+    });
+
+    driver.setSink(new EventSink() {
+
+      @Override
+      public void append(Context context, Event<?> event)
+          throws InterruptedException, MessageDeliveryException {
+
+        sinkCounters.incrementAndGet("append");
+
+        sink.append(context, event);
+      }
+
+      @Override
+      public void open(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sinkCounters.incrementAndGet("open");
+
+        sink.open(context);
+      }
+
+      @Override
+      public void close(Context context) throws InterruptedException,
+          LifecycleException {
+
+        throw new LifecycleException("Close failed because I said so!");
+      }
+    });
+
+    Context context = new Context();
+
+    driver.start(context);
+
+    LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+        LifecycleState.START, LifecycleState.ERROR }, 5000);
+
+    Assert.assertEquals(LifecycleState.START, driver.getLifecycleState());
+
+    Thread.sleep(500);
+
+    driver.stop(context);
+
+    LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+        LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+    Assert.assertEquals(LifecycleState.ERROR, driver.getLifecycleState());
+
+    Assert.assertEquals(Long.valueOf(1), sourceCounters.get("open"));
+    Assert.assertEquals(Long.valueOf(0), sourceCounters.get("close"));
+    Assert.assertTrue(sourceCounters.get("next") > 0);
+    Assert.assertEquals(Long.valueOf(1), sinkCounters.get("open"));
+    Assert.assertEquals(Long.valueOf(0), sinkCounters.get("close"));
+    Assert.assertTrue(sinkCounters.get("append") > 0);
+    Assert.assertEquals(
+        "Source next() events do not match sink append() events",
+        sourceCounters.get("next"), sinkCounters.get("append"));
+
+    logger.info("Source counters:{} Sink counters:{}", sourceCounters,
+        sinkCounters);
+  }
+
+  @Test
+  public void testFlakeyNext() throws LifecycleException, InterruptedException {
+    final EventSource source = new SequenceGeneratorSource();
+    final EventSink sink = new NullSink();
+    final CounterGroup sourceCounters = new CounterGroup();
+    final CounterGroup sinkCounters = new CounterGroup();
+
+    sourceCounters.setName("source");
+    sinkCounters.setName("sink");
+
+    driver.setSource(new EventSource() {
+
+      @Override
+      public Event<?> next(Context context) throws InterruptedException,
+          MessageDeliveryException {
+
+        if (Math.round(Math.random()) == 0) {
+          throw new MessageDeliveryException("I don't feel like working.");
+        }
+
+        sourceCounters.incrementAndGet("next");
+
+        return source.next(context);
+      }
+
+      @Override
+      public void open(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sourceCounters.incrementAndGet("open");
+
+        source.open(context);
+      }
+
+      @Override
+      public void close(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sourceCounters.incrementAndGet("close");
+
+        source.close(context);
+      }
+    });
+
+    driver.setSink(new EventSink() {
+
+      @Override
+      public void append(Context context, Event<?> event)
+          throws InterruptedException, MessageDeliveryException {
+
+        sinkCounters.incrementAndGet("append");
+
+        sink.append(context, event);
+      }
+
+      @Override
+      public void open(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sinkCounters.incrementAndGet("open");
+
+        sink.open(context);
+      }
+
+      @Override
+      public void close(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sinkCounters.incrementAndGet("close");
+
+        sink.close(context);
+      }
+    });
+
+    Context context = new Context();
+
+    driver.start(context);
+
+    LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+        LifecycleState.START, LifecycleState.ERROR }, 5000);
+
+    Assert.assertEquals(LifecycleState.START, driver.getLifecycleState());
+
+    Thread.sleep(500);
+
+    driver.stop(context);
+
+    LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+        LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+    Assert.assertEquals(LifecycleState.STOP, driver.getLifecycleState());
+
+    Assert.assertEquals(Long.valueOf(1), sourceCounters.get("open"));
+    Assert.assertEquals(Long.valueOf(1), sourceCounters.get("close"));
+    Assert.assertTrue(sourceCounters.get("next") > 0);
+    Assert.assertEquals(Long.valueOf(1), sinkCounters.get("open"));
+    Assert.assertEquals(Long.valueOf(1), sinkCounters.get("close"));
+    Assert.assertTrue(sinkCounters.get("append") > 0);
+    Assert.assertEquals(
+        "Source next() events do not match sink append() events",
+        sourceCounters.get("next"), sinkCounters.get("append"));
+
+    logger.info("Source counters:{} Sink counters:{}", sourceCounters,
+        sinkCounters);
+  }
+
+  @Test
+  public void testFlakeyAppend() throws LifecycleException,
+      InterruptedException {
+
+    final EventSource source = new SequenceGeneratorSource();
+    final EventSink sink = new NullSink();
+    final CounterGroup sourceCounters = new CounterGroup();
+    final CounterGroup sinkCounters = new CounterGroup();
+
+    sourceCounters.setName("source");
+    sinkCounters.setName("sink");
+
+    driver.setSource(new EventSource() {
+
+      @Override
+      public Event<?> next(Context context) throws InterruptedException,
+          MessageDeliveryException {
+
+        sourceCounters.incrementAndGet("next");
+
+        return source.next(context);
+      }
+
+      @Override
+      public void open(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sourceCounters.incrementAndGet("open");
+
+        source.open(context);
+      }
+
+      @Override
+      public void close(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sourceCounters.incrementAndGet("close");
+
+        source.close(context);
+      }
+    });
+
+    driver.setSink(new EventSink() {
+
+      @Override
+      public void append(Context context, Event<?> event)
+          throws InterruptedException, MessageDeliveryException {
+
+        if (Math.round(Math.random()) == 0) {
+          throw new MessageDeliveryException("I don't feel like working.");
+        }
+
+        sinkCounters.incrementAndGet("append");
+
+        sink.append(context, event);
+      }
+
+      @Override
+      public void open(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sinkCounters.incrementAndGet("open");
+
+        sink.open(context);
+      }
+
+      @Override
+      public void close(Context context) throws InterruptedException,
+          LifecycleException {
+
+        sinkCounters.incrementAndGet("close");
+
+        sink.close(context);
+      }
+    });
+
+    Context context = new Context();
+
+    driver.start(context);
+
+    LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+        LifecycleState.START, LifecycleState.ERROR }, 5000);
+
+    Assert.assertEquals(LifecycleState.START, driver.getLifecycleState());
+
+    Thread.sleep(500);
+
+    driver.stop(context);
+
+    LifecycleController.waitForOneOf(driver, new LifecycleState[] {
+        LifecycleState.STOP, LifecycleState.ERROR }, 5000);
+
+    Assert.assertEquals(LifecycleState.STOP, driver.getLifecycleState());
+
+    Assert.assertEquals(Long.valueOf(1), sourceCounters.get("open"));
+    Assert.assertEquals(Long.valueOf(1), sourceCounters.get("close"));
+    Assert.assertTrue(sourceCounters.get("next") > 0);
+    Assert.assertEquals(Long.valueOf(1), sinkCounters.get("open"));
+    Assert.assertEquals(Long.valueOf(1), sinkCounters.get("close"));
+    Assert.assertTrue(sinkCounters.get("append") > 0);
 
     logger.info("Source counters:{} Sink counters:{}", sourceCounters,
         sinkCounters);