You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/08/28 11:25:59 UTC

[1/2] flume git commit: FLUME-3050 add counters for error conditions and expose to monitor URL

Repository: flume
Updated Branches:
  refs/heads/trunk 368776ff7 -> 3a22cd4d8


http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hive-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/pom.xml b/flume-ng-sinks/flume-hive-sink/pom.xml
index 11a97da..951f205 100644
--- a/flume-ng-sinks/flume-hive-sink/pom.xml
+++ b/flume-ng-sinks/flume-hive-sink/pom.xml
@@ -239,6 +239,12 @@ limitations under the License.
     </dependency>
     <!-- end temporary -->
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
index cc5cdca..8db008e 100644
--- a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
+++ b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
@@ -264,6 +264,7 @@ public class HiveSink extends AbstractSink implements Configurable {
       LOG.warn(getName() + ": Thread was interrupted.", err);
       return Status.BACKOFF;
     } catch (Exception e) {
+      sinkCounter.incrementEventWriteOrChannelFail(e);
       throw new EventDeliveryException(e);
     } finally {
       if (!success) {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
index c417404..fbb2de2 100644
--- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
+++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java
@@ -23,10 +23,12 @@ package org.apache.flume.sink.hive;
 import com.google.common.collect.Lists;
 import junit.framework.Assert;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
@@ -44,6 +46,8 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,10 +133,8 @@ public class TestHiveSink {
     TestUtil.dropDB(conf, dbName);
   }
 
-
-  @Test
-  public void testSingleWriterSimplePartitionedTable()
-          throws EventDeliveryException, IOException, CommandNeedRetryException {
+  public void testSingleWriter(boolean partitioned, String dbName, String tblName,
+                               Channel pChannel) throws Exception {
     int totalRecords = 4;
     int batchSize = 2;
     int batchCount = totalRecords / batchSize;
@@ -141,14 +143,16 @@ public class TestHiveSink {
     context.put("hive.metastore", metaStoreURI);
     context.put("hive.database",dbName);
     context.put("hive.table",tblName);
-    context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+    if (partitioned) {
+      context.put("hive.partition", PART1_VALUE + "," + PART2_VALUE);
+    }
     context.put("autoCreatePartitions","false");
     context.put("batchSize","" + batchSize);
     context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
     context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
     context.put("heartBeatInterval", "0");
 
-    Channel channel = startSink(sink, context);
+    Channel channel = startSink(sink, context, pChannel);
 
     List<String> bodies = Lists.newArrayList();
 
@@ -171,11 +175,17 @@ public class TestHiveSink {
     for (int i = 0; i < batchCount ; i++) {
       sink.process();
     }
+    checkRecordCountInTable(totalRecords, dbName, tblName);
     sink.stop();
     checkRecordCountInTable(totalRecords, dbName, tblName);
   }
 
   @Test
+  public void testSingleWriterSimplePartitionedTable() throws Exception {
+    testSingleWriter(true, dbName, tblName, null);
+  }
+
+  @Test
   public void testSingleWriterSimpleUnPartitionedTable()
           throws Exception {
     TestUtil.dropDB(conf, dbName2);
@@ -185,47 +195,7 @@ public class TestHiveSink {
                               null, dbLocation);
 
     try {
-      int totalRecords = 4;
-      int batchSize = 2;
-      int batchCount = totalRecords / batchSize;
-
-      Context context = new Context();
-      context.put("hive.metastore", metaStoreURI);
-      context.put("hive.database", dbName2);
-      context.put("hive.table", tblName2);
-      context.put("autoCreatePartitions","false");
-      context.put("batchSize","" + batchSize);
-      context.put("serializer", HiveDelimitedTextSerializer.ALIAS);
-      context.put("serializer.fieldnames", COL1 + ",," + COL2 + ",");
-      context.put("heartBeatInterval", "0");
-
-      Channel channel = startSink(sink, context);
-
-      List<String> bodies = Lists.newArrayList();
-
-      // Push the events in two batches
-      Transaction txn = channel.getTransaction();
-      txn.begin();
-      for (int j = 1; j <= totalRecords; j++) {
-        Event event = new SimpleEvent();
-        String body = j + ",blah,This is a log message,other stuff";
-        event.setBody(body.getBytes());
-        bodies.add(body);
-        channel.put(event);
-      }
-
-      txn.commit();
-      txn.close();
-
-      checkRecordCountInTable(0, dbName2, tblName2);
-      for (int i = 0; i < batchCount ; i++) {
-        sink.process();
-      }
-
-      // check before & after  stopping sink
-      checkRecordCountInTable(totalRecords, dbName2, tblName2);
-      sink.stop();
-      checkRecordCountInTable(totalRecords, dbName2, tblName2);
+      testSingleWriter(false, dbName2, tblName2, null);
     } finally {
       TestUtil.dropDB(conf, dbName2);
     }
@@ -398,6 +368,23 @@ public class TestHiveSink {
     checkRecordCountInTable(totalRecords, dbName, tblName);
   }
 
+  @Test
+  public void testErrorCounter() throws Exception {
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Transaction transaction = Mockito.mock(BasicTransactionSemantics.class);
+    Mockito.when(channel.getTransaction()).thenReturn(transaction);
+
+    try {
+      testSingleWriter(true, dbName, tblName, channel);
+    } catch (EventDeliveryException e) {
+      //Expected exception
+    }
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getChannelReadFail());
+  }
+
   private void sleep(int n) {
     try {
       Thread.sleep(n);
@@ -406,9 +393,13 @@ public class TestHiveSink {
   }
 
   private static Channel startSink(HiveSink sink, Context context) {
+    return startSink(sink, context, null);
+  }
+
+  private static Channel startSink(HiveSink sink, Context context, Channel pChannel) {
     Configurables.configure(sink, context);
 
-    Channel channel = new MemoryChannel();
+    Channel channel = pChannel == null ? new MemoryChannel() : pChannel;
     Configurables.configure(channel, context);
     sink.setChannel(channel);
     sink.start();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
index 19020fd..08e887b 100644
--- a/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
+++ b/flume-ng-sinks/flume-http-sink/src/main/java/org/apache/flume/sink/http/HttpSink.java
@@ -275,6 +275,7 @@ public class HttpSink extends AbstractSink implements Configurable {
           status = Status.BACKOFF;
 
           LOG.error("Error opening connection, or request timed out", e);
+          sinkCounter.incrementEventWriteFail();
         }
 
       } else {
@@ -289,6 +290,7 @@ public class HttpSink extends AbstractSink implements Configurable {
       status = Status.BACKOFF;
 
       LOG.error("Error sending HTTP request, retrying", t);
+      sinkCounter.incrementEventWriteOrChannelFail(t);
 
       // re-throw all Errors
       if (t instanceof Error) {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
index bee089c..175df2c 100644
--- a/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
+++ b/flume-ng-sinks/flume-http-sink/src/test/java/org/apache/flume/sink/http/TestHttpSink.java
@@ -19,11 +19,13 @@
 package org.apache.flume.sink.http;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.Sink.Status;
 import org.apache.flume.Transaction;
 import org.apache.flume.instrumentation.SinkCounter;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -216,6 +218,20 @@ public class TestHttpSink {
   }
 
   @Test
+  public void testErrorCounter() throws Exception {
+    RuntimeException exception = new RuntimeException("dummy");
+    when(channel.take()).thenThrow(exception);
+
+    Context context = new Context();
+    context.put("defaultRollback", "false");
+    context.put("defaultBackoff", "false");
+    context.put("defaultIncrementMetrics", "false");
+
+    executeWithMocks(false, Status.BACKOFF, false, false, context, HttpURLConnection.HTTP_OK);
+    inOrder(sinkCounter).verify(sinkCounter).incrementEventWriteOrChannelFail(exception);
+  }
+
+  @Test
   public void ensureSingleErrorStatusConfigurationCorrectlyUsed() throws Exception {
     when(channel.take()).thenReturn(event);
     when(event.getBody()).thenReturn("something".getBytes());

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
index a62d27e..1c6e285 100644
--- a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
@@ -368,6 +368,7 @@ public class HBase2Sink extends AbstractSink implements Configurable {
       }
       logger.error("Failed to commit transaction." +
           "Transaction rolled back.", e);
+      sinkCounter.incrementEventWriteOrChannelFail(e);
       if (e instanceof Error || e instanceof RuntimeException) {
         logger.error("Failed to commit transaction." +
             "Transaction rolled back.", e);

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
index 0f482fc..277e0cf 100644
--- a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/TestHBase2Sink.java
@@ -35,6 +35,7 @@ import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -56,6 +57,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -472,6 +474,8 @@ public class TestHBase2Sink {
       Assert.fail("take() method should throw exception");
     } catch (ChannelException ex) {
       Assert.assertEquals("Mock Exception", ex.getMessage());
+      SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+      Assert.assertEquals(1, sinkCounter.getChannelReadFail());
     }
     doReturn(e).when(channel).take();
     sink.process();
@@ -514,6 +518,8 @@ public class TestHBase2Sink {
       Assert.fail("FlumeException expected from serializer");
     } catch (FlumeException ex) {
       Assert.assertEquals("Exception for testing", ex.getMessage());
+      SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+      Assert.assertEquals(1, sinkCounter.getEventWriteFail());
     }
     MockSimpleHBase2EventSerializer.throwException = false;
     sink.process();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
index 39dd3bd..86a8a18 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-kafka-sink/pom.xml
@@ -110,6 +110,12 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index d60d67e..7f347d8 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -250,6 +250,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
     } catch (Exception ex) {
       String errorMsg = "Failed to publish events";
       logger.error("Failed to publish events", ex);
+      counter.incrementEventWriteOrChannelFail(ex);
       result = Status.BACKOFF;
       if (transaction != null) {
         try {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index d92c71f..92151cb 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -38,6 +38,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.shared.kafka.test.KafkaPartitionTestUtil;
 import org.apache.flume.shared.kafka.test.PartitionOption;
 import org.apache.flume.shared.kafka.test.PartitionTestScenario;
@@ -49,6 +50,7 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -458,9 +460,17 @@ public class TestKafkaSink {
     doPartitionErrors(PartitionOption.NOTSET);
   }
 
-  @Test(expected = org.apache.flume.EventDeliveryException.class)
+  @Test
   public void testPartitionHeaderOutOfRange() throws Exception {
-    doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE);
+    Sink kafkaSink = new KafkaSink();
+    try {
+      doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE, kafkaSink);
+      fail();
+    } catch (EventDeliveryException e) {
+      //
+    }
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(kafkaSink, "counter");
+    assertEquals(1, sinkCounter.getEventWriteFail());
   }
 
   @Test(expected = org.apache.flume.EventDeliveryException.class)
@@ -511,7 +521,10 @@ public class TestKafkaSink {
    * @throws Exception
    */
   private void doPartitionErrors(PartitionOption option) throws Exception {
-    Sink kafkaSink = new KafkaSink();
+    doPartitionErrors(option, new KafkaSink());
+  }
+
+  private void doPartitionErrors(PartitionOption option, Sink kafkaSink) throws Exception {
     Context context = prepareDefaultContext();
     context.put(KafkaSinkConstants.PARTITION_HEADER_NAME, "partition-header");
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml b/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
index 202e4fd..5f8732a 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/pom.xml
@@ -117,6 +117,11 @@ limitations under the License.
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
index 0917d39..7d9f807 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineSink.java
@@ -166,6 +166,7 @@ public class MorphlineSink extends AbstractSink implements Configurable {
       // Ooops - need to rollback and back off
       LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " +
           myChannel.getName() + ". Exception follows.", t);
+      sinkCounter.incrementEventWriteOrChannelFail(t);
       try {
         if (!isMorphlineTransactionCommitted) {
           handler.rollbackTransaction();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
index 1bfae95..100e82e 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
@@ -28,15 +28,19 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServer;
@@ -47,6 +51,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -295,6 +301,19 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 {
   }
 
   @Test
+  public void testErrorCounters() throws Exception {
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Transaction transaction = Mockito.mock(BasicTransactionSemantics.class);
+    Mockito.when(channel.getTransaction()).thenReturn(transaction);
+    sink.setChannel(channel);
+    sink.process();
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    assertEquals(1, sinkCounter.getChannelReadFail());
+  }
+
+  @Test
   public void testAvroRoundTrip() throws Exception {
     String file = RESOURCES_DIR + "/test-documents" + "/sample-statuses-20120906-141433.avro";
     testDocumentTypesInternal(file);

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
index e5ed969..5dd82c9 100644
--- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSSource.java
@@ -295,11 +295,13 @@ public class JMSSource extends AbstractPollableSource {
       logger.warn("Error appending event to channel. "
           + "Channel might be full. Consider increasing the channel "
           + "capacity or make sure the sinks perform faster.", channelException);
+      sourceCounter.incrementChannelWriteFail();
     } catch (JMSException jmsException) {
       logger.warn("JMSException consuming events", jmsException);
       if (++jmsExceptionCounter > errorThreshold) {
         if (consumer != null) {
           logger.warn("Exceeded JMSException threshold, closing consumer");
+          sourceCounter.incrementEventReadFail();
           consumer.rollback();
           consumer.close();
           consumer = null;
@@ -307,6 +309,7 @@ public class JMSSource extends AbstractPollableSource {
       }
     } catch (Throwable throwable) {
       logger.error("Unexpected error processing events", throwable);
+      sourceCounter.incrementEventReadFail();
       if (throwable instanceof Error) {
         throw (Error) throwable;
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
index ed81b75..2818c5b 100644
--- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
+++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSSource.java
@@ -41,7 +41,9 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -338,7 +340,30 @@ public class TestJMSSource extends JMSMessageConsumerTestBase {
       Assert.assertEquals(Status.BACKOFF, source.process());
     }
     Assert.assertEquals(Status.BACKOFF, source.process());
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
     verify(consumer, times(attempts + 1)).rollback();
     verify(consumer, times(1)).close();
   }
+
+  @Test
+  public void testErrorCounterEventReadFail() throws Exception {
+    source.configure(context);
+    source.start();
+    when(consumer.take()).thenThrow(new RuntimeException("dummy"));
+    source.process();
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
+  }
+
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    source.configure(context);
+    source.start();
+    when(source.getChannelProcessor()).thenThrow(new ChannelException("dummy"));
+    source.process();
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getChannelWriteFail());
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index ffdc96e..8053b41 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -315,6 +315,7 @@ public class KafkaSource extends AbstractPollableSource
       return Status.BACKOFF;
     } catch (Exception e) {
       log.error("KafkaSource EXCEPTION, {}", e);
+      counter.incrementEventReadOrChannelFail(e);
       return Status.BACKOFF;
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
index 7804fa2..bb20e35 100644
--- a/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/test/java/org/apache/flume/source/kafka/TestKafkaSource.java
@@ -34,6 +34,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.PollableSource.Status;
 import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -47,6 +48,8 @@ import org.apache.kafka.common.security.JaasUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -82,6 +85,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
 public class TestKafkaSource {
@@ -429,6 +433,36 @@ public class TestKafkaSource {
   }
 
   @Test
+  public void testErrorCounters() throws InterruptedException, EventDeliveryException {
+    context.put(TOPICS, topic0);
+    context.put(BATCH_SIZE, "1");
+    kafkaSource.configure(context);
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).doThrow(new RuntimeException("dummy"))
+        .when(cp).processEventBatch(any(List.class));
+    kafkaSource.setChannelProcessor(cp);
+
+    kafkaSource.start();
+
+    Thread.sleep(500L);
+
+    kafkaServer.produce(topic0, "", "hello, world");
+
+    Thread.sleep(500L);
+
+    kafkaSource.doProcess();
+    kafkaSource.doProcess();
+
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(kafkaSource, "counter");
+    Assert.assertEquals(1, sc.getChannelWriteFail());
+    Assert.assertEquals(1, sc.getEventReadFail());
+
+    kafkaSource.stop();
+  }
+
+
+  @Test
   public void testSourceProperties() {
     Context context = new Context();
     context.put(TOPICS, "test1, test2");

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-scribe-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/pom.xml b/flume-ng-sources/flume-scribe-source/pom.xml
index 933f5aa..1b15874 100644
--- a/flume-ng-sources/flume-scribe-source/pom.xml
+++ b/flume-ng-sources/flume-scribe-source/pom.xml
@@ -178,6 +178,12 @@ limitations under the License.
       <artifactId>libthrift</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
index 551fe1f..b45b7fc 100644
--- a/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
+++ b/flume-ng-sources/flume-scribe-source/src/main/java/org/apache/flume/source/scribe/ScribeSource.java
@@ -176,6 +176,7 @@ public class ScribeSource extends AbstractSource implements
           return ResultCode.OK;
         } catch (Exception e) {
           LOG.warn("Scribe source handling failure", e);
+          sourceCounter.incrementEventReadOrChannelFail(e);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java b/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
index 9059eba..aba3e49 100644
--- a/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
+++ b/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
@@ -24,6 +24,7 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.transport.TFramedTransport;
@@ -32,12 +33,20 @@ import org.apache.thrift.transport.TTransport;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
 /**
  *
  */
@@ -80,8 +89,7 @@ public class TestScribeSource {
     scribeSource.start();
   }
 
-  @Test
-  public void testScribeMessage() throws Exception {
+  private void sendSingle() throws org.apache.thrift.TException {
     TTransport transport = new TFramedTransport(new TSocket("localhost", port));
 
     TProtocol protocol = new TBinaryProtocol(transport);
@@ -91,6 +99,11 @@ public class TestScribeSource {
     List<LogEntry> logEntries = new ArrayList<LogEntry>(1);
     logEntries.add(logEntry);
     client.Log(logEntries);
+  }
+
+  @Test
+  public void testScribeMessage() throws Exception {
+    sendSingle();
 
     // try to get it from Channels
     Transaction tx = memoryChannel.getTransaction();
@@ -131,6 +144,21 @@ public class TestScribeSource {
     tx.close();
   }
 
+  @Test
+  public void testErrorCounter() throws Exception {
+    ChannelProcessor cp = mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).when(cp).processEventBatch(anyListOf(Event.class));
+    ChannelProcessor origCp = scribeSource.getChannelProcessor();
+    scribeSource.setChannelProcessor(cp);
+
+    sendSingle();
+
+    scribeSource.setChannelProcessor(origCp);
+
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(scribeSource, "sourceCounter");
+    org.junit.Assert.assertEquals(1, sc.getChannelWriteFail());
+  }
+
   @AfterClass
   public static void cleanup() {
     memoryChannel.stop();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-taildir-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/pom.xml b/flume-ng-sources/flume-taildir-source/pom.xml
index bd5a707..011532e 100644
--- a/flume-ng-sources/flume-taildir-source/pom.xml
+++ b/flume-ng-sources/flume-taildir-source/pom.xml
@@ -41,6 +41,13 @@ limitations under the License.
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
index a107a01..0c656d6 100644
--- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
+++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
@@ -234,6 +234,7 @@ public class TaildirSource extends AbstractSource implements
       }
     } catch (Throwable t) {
       logger.error("Unable to tail files", t);
+      sourceCounter.incrementEventReadFail();
       status = Status.BACKOFF;
     }
     return status;
@@ -265,6 +266,7 @@ public class TaildirSource extends AbstractSource implements
       } catch (ChannelException ex) {
         logger.warn("The channel is full or unexpected failure. " +
             "The source will try again after " + retryInterval + " ms");
+        sourceCounter.incrementChannelWriteFail();
         TimeUnit.MILLISECONDS.sleep(retryInterval);
         retryInterval = retryInterval << 1;
         retryInterval = Math.min(retryInterval, maxRetryInterval);
@@ -306,6 +308,7 @@ public class TaildirSource extends AbstractSource implements
         }
       } catch (Throwable t) {
         logger.error("Uncaught exception in IdleFileChecker thread", t);
+        sourceCounter.incrementGenericProcessingFail();
       }
     }
   }
@@ -332,11 +335,13 @@ public class TaildirSource extends AbstractSource implements
       }
     } catch (Throwable t) {
       logger.error("Failed writing positionFile", t);
+      sourceCounter.incrementGenericProcessingFail();
     } finally {
       try {
         if (writer != null) writer.close();
       } catch (IOException e) {
         logger.error("Error: " + e.getMessage(), e);
+        sourceCounter.incrementGenericProcessingFail();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
index 097ee0b..6825cc5 100644
--- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
+++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
@@ -17,10 +17,15 @@
 
 package org.apache.flume.source.taildir;
 
+import static org.mockito.Mockito.anyListOf;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -34,11 +39,15 @@ import org.apache.flume.lifecycle.LifecycleState;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS;
 import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX;
@@ -202,8 +211,7 @@ public class TestTaildirSource {
     }
   }
 
-  @Test
-  public void testFileConsumeOrder() throws IOException {
+  private ArrayList<String> prepareFileConsumeOrder() throws IOException {
     System.out.println(tmpDir.toString());
     // 1) Create 1st file
     File f1 = new File(tmpDir, "file1");
@@ -257,13 +265,31 @@ public class TestTaildirSource {
     f3.setLastModified(System.currentTimeMillis());
 
     // 4) Consume the files
-    ArrayList<String> consumedOrder = Lists.newArrayList();
     Context context = new Context();
     context.put(POSITION_FILE, posFilePath);
     context.put(FILE_GROUPS, "g1");
     context.put(FILE_GROUPS_PREFIX + "g1", tmpDir.getAbsolutePath() + "/.*");
 
     Configurables.configure(source, context);
+
+    // 6) Ensure consumption order is in order of last update time
+    ArrayList<String> expected = Lists.newArrayList(line1, line2, line3,    // file1
+        line1b, line2b, line3b, // file2
+        line1d, line2d, line3d, // file4
+        line1c, line2c, line3c  // file3
+    );
+    for (int i = 0; i != expected.size(); ++i) {
+      expected.set(i, expected.get(i).trim());
+    }
+
+    return expected;
+  }
+
+  @Test
+  public void testFileConsumeOrder() throws IOException {
+    ArrayList<String> consumedOrder = Lists.newArrayList();
+    ArrayList<String> expected = prepareFileConsumeOrder();
+
     source.start();
     source.process();
     Transaction txn = channel.getTransaction();
@@ -278,21 +304,11 @@ public class TestTaildirSource {
 
     System.out.println(consumedOrder);
 
-    // 6) Ensure consumption order is in order of last update time
-    ArrayList<String> expected = Lists.newArrayList(line1, line2, line3,    // file1
-                                                    line1b, line2b, line3b, // file2
-                                                    line1d, line2d, line3d, // file4
-                                                    line1c, line2c, line3c  // file3
-                                                   );
-    for (int i = 0; i != expected.size(); ++i) {
-      expected.set(i, expected.get(i).trim());
-    }
     assertArrayEquals("Files not consumed in expected order", expected.toArray(),
                       consumedOrder.toArray());
   }
 
-  @Test
-  public void testPutFilenameHeader() throws IOException {
+  private File configureSource()  throws IOException {
     File f1 = new File(tmpDir, "file1");
     Files.write("f1\n", f1, Charsets.UTF_8);
 
@@ -304,6 +320,13 @@ public class TestTaildirSource {
     context.put(FILENAME_HEADER_KEY, "path");
 
     Configurables.configure(source, context);
+
+    return f1;
+  }
+
+  @Test
+  public void testPutFilenameHeader() throws IOException {
+    File f1 = configureSource();
     source.start();
     source.process();
     Transaction txn = channel.getTransaction();
@@ -316,4 +339,45 @@ public class TestTaildirSource {
     assertEquals(f1.getAbsolutePath(),
             e.getHeaders().get("path"));
   }
+
+  @Test
+  public void testErrorCounterEventReadFail() throws Exception {
+    configureSource();
+    source.start();
+    ReliableTaildirEventReader reader = Mockito.mock(ReliableTaildirEventReader.class);
+    Whitebox.setInternalState(source, "reader", reader);
+    when(reader.updateTailFiles()).thenReturn(Collections.singletonList(123L));
+    when(reader.getTailFiles()).thenThrow(new RuntimeException("hello"));
+    source.process();
+    assertEquals(1, source.getSourceCounter().getEventReadFail());
+    source.stop();
+  }
+
+  @Test
+  public void testErrorCounterFileHandlingFail() throws Exception {
+    configureSource();
+    Whitebox.setInternalState(source, "idleTimeout", 0);
+    Whitebox.setInternalState(source, "checkIdleInterval", 60);
+    source.start();
+    ReliableTaildirEventReader reader = Mockito.mock(ReliableTaildirEventReader.class);
+    when(reader.getTailFiles()).thenThrow(new RuntimeException("hello"));
+    Whitebox.setInternalState(source, "reader", reader);
+    TimeUnit.MILLISECONDS.sleep(200);
+    assertTrue(0 < source.getSourceCounter().getGenericProcessingFail());
+    source.stop();
+  }
+
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    prepareFileConsumeOrder();
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    source.setChannelProcessor(cp);
+    doThrow(new ChannelException("dummy")).doNothing().when(cp)
+        .processEventBatch(anyListOf(Event.class));
+    source.start();
+    source.process();
+    assertEquals(1, source.getSourceCounter().getChannelWriteFail());
+    source.stop();
+  }
+
 }


[2/2] flume git commit: FLUME-3050 add counters for error conditions and expose to monitor URL

Posted by sz...@apache.org.
FLUME-3050 add counters for error conditions and expose to monitor URL

By introducing error counters it will be easier to monitor problems.
Also errors are categorized, hopefully this will help setting up better
monitoring solutions.

Concept: an error is when an Exception is thrown or an ERROR level log is
written during event processing. In case of an error at least 1 error counter
is increased at least once. (Preferably 1 counter once).
Errors during event processing are counted.
Initialization errors are not handled here.
3 types of errors are differentiated.
- Channel read/write errors from the channel when the channel
  throws a ChannelException.
- Event read/write errors. E.g: A source cannot read an event due to
- Generic errors - e.g.: TaildirSource cannot write position file.

This closes #222

Reviewers: Peter Turcsanyi, Ferenc Szabo

(Endre Major via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3a22cd4d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3a22cd4d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3a22cd4d

Branch: refs/heads/trunk
Commit: 3a22cd4d8bc47f0e7c30bba93186ad0cf602c07e
Parents: 368776f
Author: Endre Major <em...@cloudera.com>
Authored: Tue Aug 28 13:25:08 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Tue Aug 28 13:25:08 2018 +0200

----------------------------------------------------------------------
 .../apache/flume/client/avro/AvroCLIClient.java |   5 +-
 .../avro/ReliableSpoolingFileEventReader.java   |  20 ++-
 .../flume/instrumentation/SinkCounter.java      |  36 +++-
 .../flume/instrumentation/SinkCounterMBean.java |   5 +
 .../flume/instrumentation/SourceCounter.java    |  50 +++++-
 .../instrumentation/SourceCounterMBean.java     |   7 +
 .../org/apache/flume/sink/AbstractRpcSink.java  |   2 +
 .../org/apache/flume/sink/RollingFileSink.java  |   1 +
 .../org/apache/flume/source/AvroSource.java     |   2 +
 .../flume/source/MultiportSyslogTCPSource.java  |   4 +
 .../flume/source/SequenceGeneratorSource.java   |   1 +
 .../flume/source/SpoolDirectorySource.java      |  14 +-
 .../apache/flume/source/SyslogTcpSource.java    |   2 +
 .../apache/flume/source/SyslogUDPSource.java    |   2 +
 .../org/apache/flume/source/ThriftSource.java   |   2 +
 .../apache/flume/source/http/HTTPSource.java    |   4 +
 .../TestReliableSpoolingFileEventReader.java    |  27 ++-
 .../client/avro/TestSpoolingFileLineReader.java |   2 +
 .../org/apache/flume/sink/TestAvroSink.java     |  40 +++++
 .../apache/flume/sink/TestRollingFileSink.java  | 168 ++++++-------------
 .../org/apache/flume/source/TestAvroSource.java |  31 ++++
 .../source/TestMultiportSyslogTCPSource.java    | 102 ++++++++---
 .../flume/source/TestSpoolDirectorySource.java  |  70 ++++++++
 .../flume/source/TestSyslogTcpSource.java       |  43 +++++
 .../flume/source/TestSyslogUdpSource.java       |  53 ++++--
 .../apache/flume/source/TestThriftSource.java   |  37 ++++
 .../flume/source/http/TestHTTPSource.java       |  24 ++-
 .../apache/flume/sink/hdfs/HDFSEventSink.java   |   2 +
 .../flume/sink/hdfs/TestHDFSEventSink.java      |  37 ++++
 flume-ng-sinks/flume-hive-sink/pom.xml          |   6 +
 .../org/apache/flume/sink/hive/HiveSink.java    |   1 +
 .../apache/flume/sink/hive/TestHiveSink.java    |  87 +++++-----
 .../org/apache/flume/sink/http/HttpSink.java    |   2 +
 .../apache/flume/sink/http/TestHttpSink.java    |  16 ++
 .../apache/flume/sink/hbase2/HBase2Sink.java    |   1 +
 .../flume/sink/hbase2/TestHBase2Sink.java       |   6 +
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml      |   6 +
 .../org/apache/flume/sink/kafka/KafkaSink.java  |   1 +
 .../apache/flume/sink/kafka/TestKafkaSink.java  |  19 ++-
 .../flume-ng-morphline-solr-sink/pom.xml        |   5 +
 .../sink/solr/morphline/MorphlineSink.java      |   1 +
 .../solr/morphline/TestMorphlineSolrSink.java   |  19 +++
 .../org/apache/flume/source/jms/JMSSource.java  |   3 +
 .../apache/flume/source/jms/TestJMSSource.java  |  25 +++
 .../apache/flume/source/kafka/KafkaSource.java  |   1 +
 .../flume/source/kafka/TestKafkaSource.java     |  34 ++++
 flume-ng-sources/flume-scribe-source/pom.xml    |   6 +
 .../flume/source/scribe/ScribeSource.java       |   1 +
 .../flume/source/scribe/TestScribeSource.java   |  32 +++-
 flume-ng-sources/flume-taildir-source/pom.xml   |   7 +
 .../flume/source/taildir/TaildirSource.java     |   5 +
 .../flume/source/taildir/TestTaildirSource.java |  92 ++++++++--
 52 files changed, 940 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
index dd2aeef..242c821 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
@@ -43,6 +43,7 @@ import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -203,7 +204,9 @@ public class AvroCLIClient {
         reader = new SimpleTextLineEventReader(new FileReader(new File(fileName)));
       } else if (dirName != null) {
         reader = new ReliableSpoolingFileEventReader.Builder()
-            .spoolDirectory(new File(dirName)).build();
+            .spoolDirectory(new File(dirName))
+            .sourceCounter(new SourceCounter("avrocli"))
+            .build();
       } else {
         reader = new SimpleTextLineEventReader(new InputStreamReader(System.in));
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 830d21b..e8d5a36 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -29,6 +29,7 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.serialization.DecodeErrorPolicy;
 import org.apache.flume.serialization.DurablePositionTracker;
 import org.apache.flume.serialization.EventDeserializer;
@@ -59,6 +60,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Pattern;
 
 /**
@@ -110,6 +112,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
   private final DecodeErrorPolicy decodeErrorPolicy;
   private final ConsumeOrder consumeOrder;
   private final boolean recursiveDirectorySearch;
+  private final SourceCounter sourceCounter;
 
   private Optional<FileInfo> currentFile = Optional.absent();
   /** Always contains the last file from which lines have been read. */
@@ -133,8 +136,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       String deserializerType, Context deserializerContext,
       String deletePolicy, String trackingPolicy, String inputCharset,
       DecodeErrorPolicy decodeErrorPolicy,
-      ConsumeOrder consumeOrder,
-      boolean recursiveDirectorySearch) throws IOException {
+      ConsumeOrder consumeOrder, boolean recursiveDirectorySearch,
+                                          SourceCounter sourceCounter) throws IOException {
 
     // Sanity checks
     Preconditions.checkNotNull(spoolDirectory);
@@ -147,6 +150,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     Preconditions.checkNotNull(deletePolicy);
     Preconditions.checkNotNull(trackingPolicy);
     Preconditions.checkNotNull(inputCharset);
+    Preconditions.checkNotNull(sourceCounter);
 
     // validate delete policy
     if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) &&
@@ -209,6 +213,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);
     this.consumeOrder = Preconditions.checkNotNull(consumeOrder);
     this.recursiveDirectorySearch = recursiveDirectorySearch;
+    this.sourceCounter = sourceCounter;
 
     trackerDirectory = new File(trackerDirPath);
 
@@ -287,6 +292,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     } catch (IOException e) {
       logger.error("I/O exception occurred while listing directories. " +
                    "Files already matched will be returned. " + directory, e);
+      sourceCounter.incrementGenericProcessingFail();
     }
 
     return candidateFiles;
@@ -508,6 +514,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
         if (!deleted) {
           logger.error("Unable to delete file " + fileToRoll.getAbsolutePath() +
               ". It will likely be ingested another time.");
+          sourceCounter.incrementGenericProcessingFail();
         }
       } else {
         String message = "File name has been re-used with different" +
@@ -681,6 +688,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       return Optional.absent();
     } catch (IOException e) {
       logger.error("Exception opening file: " + file, e);
+      sourceCounter.incrementGenericProcessingFail();
       return Optional.absent();
     }
   }
@@ -777,6 +785,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
         SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER;
     private boolean recursiveDirectorySearch =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_RECURSIVE_DIRECTORY_SEARCH;
+    private SourceCounter sourceCounter;
 
     public Builder spoolDirectory(File directory) {
       this.spoolDirectory = directory;
@@ -863,12 +872,17 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       return this;
     }
 
+    public Builder sourceCounter(SourceCounter sourceCounter) {
+      this.sourceCounter = sourceCounter;
+      return this;
+    }
+
     public ReliableSpoolingFileEventReader build() throws IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
           includePattern, ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
           annotateBaseName, baseNameHeader, deserializerType,
           deserializerContext, deletePolicy, trackingPolicy, inputCharset, decodeErrorPolicy,
-          consumeOrder, recursiveDirectorySearch);
+          consumeOrder, recursiveDirectorySearch, sourceCounter);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
index 534adc8..8c77959 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
@@ -18,6 +18,7 @@
 package org.apache.flume.instrumentation;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.flume.ChannelException;
 
 public class SinkCounter extends MonitoredCounterGroup implements
     SinkCounterMBean {
@@ -46,11 +47,18 @@ public class SinkCounter extends MonitoredCounterGroup implements
   private static final String COUNTER_EVENT_DRAIN_SUCCESS =
       "sink.event.drain.sucess";
 
+  private static final String COUNTER_EVENT_WRITE_FAIL =
+      "sink.event.write.fail";
+
+  private static final String COUNTER_CHANNEL_READ_FAIL =
+      "sink.channel.read.fail";
+
   private static final String[] ATTRIBUTES = {
     COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
     COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
     COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
-    COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
+    COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS,
+    COUNTER_EVENT_WRITE_FAIL, COUNTER_CHANNEL_READ_FAIL
   };
 
   public SinkCounter(String name) {
@@ -141,4 +149,30 @@ public class SinkCounter extends MonitoredCounterGroup implements
   public long addToEventDrainSuccessCount(long delta) {
     return addAndGet(COUNTER_EVENT_DRAIN_SUCCESS, delta);
   }
+
+  public long incrementEventWriteFail() {
+    return increment(COUNTER_EVENT_WRITE_FAIL);
+  }
+
+  @Override
+  public long getEventWriteFail() {
+    return get(COUNTER_EVENT_WRITE_FAIL);
+  }
+
+  public long incrementChannelReadFail() {
+    return increment(COUNTER_CHANNEL_READ_FAIL);
+  }
+
+  @Override
+  public long getChannelReadFail() {
+    return get(COUNTER_CHANNEL_READ_FAIL);
+  }
+
+  public long incrementEventWriteOrChannelFail(Throwable t) {
+    if (t instanceof ChannelException) {
+      return incrementChannelReadFail();
+    }
+    return incrementEventWriteFail();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
index 472a4dd..0ca265a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
@@ -48,4 +48,9 @@ public interface SinkCounterMBean {
   long getStopTime();
 
   String getType();
+
+  long getEventWriteFail();
+
+  long getChannelReadFail();
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
index f96694e..0e4d971 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
@@ -19,6 +19,7 @@
 package org.apache.flume.instrumentation;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.flume.ChannelException;
 
 public class SourceCounter extends MonitoredCounterGroup implements
     SourceCounterMBean {
@@ -37,15 +38,25 @@ public class SourceCounter extends MonitoredCounterGroup implements
       "src.append-batch.received";
   private static final String COUNTER_APPEND_BATCH_ACCEPTED =
       "src.append-batch.accepted";
-  
+
   private static final String COUNTER_OPEN_CONNECTION_COUNT =
           "src.open-connection.count";
 
+  private static final String COUNTER_EVENT_READ_FAIL =
+      "src.event.read.fail";
+
+  private static final String COUNTER_GENERIC_PROCESSING_FAIL =
+      "src.generic.processing.fail";
+
+  private static final String COUNTER_CHANNEL_WRITE_FAIL =
+      "src.channel.write.fail";
+
   private static final String[] ATTRIBUTES = {
     COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED,
     COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED,
     COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED,
-    COUNTER_OPEN_CONNECTION_COUNT
+    COUNTER_OPEN_CONNECTION_COUNT, COUNTER_EVENT_READ_FAIL,
+    COUNTER_CHANNEL_WRITE_FAIL, COUNTER_GENERIC_PROCESSING_FAIL
   };
 
   public SourceCounter(String name) {
@@ -126,4 +137,39 @@ public class SourceCounter extends MonitoredCounterGroup implements
   public void setOpenConnectionCount(long openConnectionCount) {
     set(COUNTER_OPEN_CONNECTION_COUNT, openConnectionCount);
   }
+
+  public long incrementEventReadFail() {
+    return increment(COUNTER_EVENT_READ_FAIL);
+  }
+
+  @Override
+  public long getEventReadFail() {
+    return get(COUNTER_EVENT_READ_FAIL);
+  }
+
+  public long incrementChannelWriteFail() {
+    return increment(COUNTER_CHANNEL_WRITE_FAIL);
+  }
+
+  @Override
+  public long getChannelWriteFail() {
+    return get(COUNTER_CHANNEL_WRITE_FAIL);
+  }
+
+  public long incrementGenericProcessingFail() {
+    return increment(COUNTER_GENERIC_PROCESSING_FAIL);
+  }
+
+  @Override
+  public long getGenericProcessingFail() {
+    return get(COUNTER_GENERIC_PROCESSING_FAIL);
+  }
+
+  public long incrementEventReadOrChannelFail(Throwable t) {
+    if (t instanceof ChannelException) {
+      return incrementChannelWriteFail();
+    }
+    return incrementEventReadFail();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
index 5ccbed4..bfbfe3e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
@@ -45,4 +45,11 @@ public interface SourceCounterMBean {
   String getType();
 
   long getOpenConnectionCount();
+
+  long getEventReadFail();
+
+  long getChannelWriteFail();
+
+  long getGenericProcessingFail();
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index f6024aa..5a5993a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -383,8 +383,10 @@ public abstract class AbstractRpcSink extends AbstractSink implements Configurab
       } else if (t instanceof ChannelException) {
         logger.error("Rpc Sink " + getName() + ": Unable to get event from" +
             " channel " + channel.getName() + ". Exception follows.", t);
+        sinkCounter.incrementChannelReadFail();
         status = Status.BACKOFF;
       } else {
+        sinkCounter.incrementEventWriteFail();
         destroyConnection();
         throw new EventDeliveryException("Failed to send events", t);
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
index ee4b947..9b0827a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
@@ -220,6 +220,7 @@ public class RollingFileSink extends AbstractSink implements Configurable {
       transaction.commit();
       sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
     } catch (Exception ex) {
+      sinkCounter.incrementEventWriteOrChannelFail(ex);
       transaction.rollback();
       throw new EventDeliveryException("Failed to process transaction", ex);
     } finally {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 623e61e..a105bbe 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -363,6 +363,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     } catch (ChannelException ex) {
       logger.warn("Avro source " + getName() + ": Unable to process event. " +
           "Exception follows.", ex);
+      sourceCounter.incrementChannelWriteFail();
       return Status.FAILED;
     }
 
@@ -393,6 +394,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     } catch (Throwable t) {
       logger.error("Avro source " + getName() + ": Unable to process event " +
           "batch. Exception follows.", t);
+      sourceCounter.incrementChannelWriteFail();
       if (t instanceof Error) {
         throw (Error) t;
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
index 201d8e7..3c59b47 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -243,6 +243,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
     public void exceptionCaught(IoSession session, Throwable cause)
         throws Exception {
       logger.error("Error in syslog message handler", cause);
+      sourceCounter.incrementGenericProcessingFail();
       if (cause instanceof Error) {
         Throwables.propagate(cause);
       }
@@ -320,6 +321,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
           sourceCounter.addToEventAcceptedCount(numEvents);
         } catch (Throwable t) {
           logger.error("Error writing to channel, event dropped", t);
+          sourceCounter.incrementEventReadOrChannelFail(t);
           if (t instanceof Error) {
             Throwables.propagate(t);
           }
@@ -341,6 +343,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
       } catch (Throwable t) {
         logger.info("Error decoding line with charset (" + decoder.charset() +
             "). Exception follows.", t);
+        sourceCounter.incrementEventReadFail();
 
         if (t instanceof Error) {
           Throwables.propagate(t);
@@ -377,6 +380,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
         event.getHeaders().put(SyslogUtils.EVENT_STATUS,
             SyslogUtils.SyslogStatus.INVALID.getSyslogStatus());
         logger.debug("Error parsing syslog event", ex);
+        sourceCounter.incrementEventReadFail();
       }
 
       return event;

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
index eaa9ef3..e494bfb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
@@ -93,6 +93,7 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements
       eventsSent = eventsSentTX;
     } catch (ChannelException ex) {
       logger.error( getName() + " source could not write to channel.", ex);
+      sourceCounter.incrementChannelWriteFail();
     }
 
     return status;

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 68bbe7f..305ca3b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -106,6 +106,7 @@ public class SpoolDirectorySource extends AbstractSource
           .consumeOrder(consumeOrder)
           .recursiveDirectorySearch(recursiveDirectorySearch)
           .trackingPolicy(trackingPolicy)
+          .sourceCounter(sourceCounter)
           .build();
     } catch (IOException ioe) {
       throw new FlumeException("Error instantiating spooling event parser",
@@ -235,7 +236,8 @@ public class SpoolDirectorySource extends AbstractSource
     return recursiveDirectorySearch;
   }
 
-  private class SpoolDirectoryRunnable implements Runnable {
+  @VisibleForTesting
+  protected class SpoolDirectoryRunnable implements Runnable {
     private ReliableSpoolingFileEventReader reader;
     private SourceCounter sourceCounter;
 
@@ -248,9 +250,12 @@ public class SpoolDirectorySource extends AbstractSource
     @Override
     public void run() {
       int backoffInterval = 250;
+      boolean readingEvents = false;
       try {
         while (!Thread.interrupted()) {
+          readingEvents = true;
           List<Event> events = reader.readEvents(batchSize);
+          readingEvents = false;
           if (events.isEmpty()) {
             break;
           }
@@ -264,6 +269,7 @@ public class SpoolDirectorySource extends AbstractSource
             logger.warn("The channel is full, and cannot write data now. The " +
                 "source will try again after " + backoffInterval +
                 " milliseconds");
+            sourceCounter.incrementChannelWriteFail();
             hitChannelFullException = true;
             backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
             continue;
@@ -271,6 +277,7 @@ public class SpoolDirectorySource extends AbstractSource
             logger.warn("The channel threw an exception, and cannot write data now. The " +
                 "source will try again after " + backoffInterval +
                 " milliseconds");
+            sourceCounter.incrementChannelWriteFail();
             hitChannelException = true;
             backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
             continue;
@@ -283,6 +290,11 @@ public class SpoolDirectorySource extends AbstractSource
         logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
             "Uncaught exception in SpoolDirectorySource thread. " +
             "Restart or reconfigure Flume to continue processing.", t);
+        if (readingEvents) {
+          sourceCounter.incrementEventReadFail();
+        } else {
+          sourceCounter.incrementGenericProcessingFail();
+        }
         hasFatalError = true;
         Throwables.propagate(t);
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index c7e8248..1a0432c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -96,8 +96,10 @@ public class SyslogTcpSource extends AbstractSource
           sourceCounter.incrementEventAcceptedCount();
         } catch (ChannelException ex) {
           logger.error("Error writting to channel, event dropped", ex);
+          sourceCounter.incrementChannelWriteFail();
         } catch (RuntimeException ex) {
           logger.error("Error parsing event from syslog stream, event dropped", ex);
+          sourceCounter.incrementEventReadFail();
           return;
         }
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
index ae0b8ac..1e47f34 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -91,9 +91,11 @@ public class SyslogUDPSource extends AbstractSource
         sourceCounter.incrementEventAcceptedCount();
       } catch (ChannelException ex) {
         logger.error("Error writting to channel", ex);
+        sourceCounter.incrementChannelWriteFail();
         return;
       } catch (RuntimeException ex) {
         logger.error("Error parsing event from syslog stream, event dropped", ex);
+        sourceCounter.incrementEventReadFail();
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
index 6a25e64..33c37f2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -430,6 +430,7 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
       } catch (ChannelException ex) {
         logger.warn("Thrift source " + getName() + " could not append events " +
                     "to the channel.", ex);
+        sourceCounter.incrementChannelWriteFail();
         return Status.FAILED;
       }
       sourceCounter.incrementAppendAcceptedCount();
@@ -451,6 +452,7 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
         getChannelProcessor().processEventBatch(flumeEvents);
       } catch (ChannelException ex) {
         logger.warn("Thrift source %s could not append events to the channel.", getName());
+        sourceCounter.incrementChannelWriteFail();
         return Status.FAILED;
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
index ce6545a..d14bde2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -265,12 +265,14 @@ public class HTTPSource extends AbstractSource implements
         events = handler.getEvents(request);
       } catch (HTTPBadRequestException ex) {
         LOG.warn("Received bad request from client. ", ex);
+        sourceCounter.incrementEventReadFail();
         response.sendError(HttpServletResponse.SC_BAD_REQUEST,
                 "Bad request from client. "
                 + ex.getMessage());
         return;
       } catch (Exception ex) {
         LOG.warn("Deserializer threw unexpected exception. ", ex);
+        sourceCounter.incrementEventReadFail();
         response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                 "Deserializer threw unexpected exception. "
                 + ex.getMessage());
@@ -284,12 +286,14 @@ public class HTTPSource extends AbstractSource implements
         LOG.warn("Error appending event to channel. "
                 + "Channel might be full. Consider increasing the channel "
                 + "capacity or make sure the sinks perform faster.", ex);
+        sourceCounter.incrementChannelWriteFail();
         response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                 "Error appending event to channel. Channel might be full."
                 + ex.getMessage());
         return;
       } catch (Exception ex) {
         LOG.warn("Unexpected error appending event to channel. ", ex);
+        sourceCounter.incrementGenericProcessingFail();
         response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                 "Unexpected error while appending event to channel. "
                 + ex.getMessage());

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 422252e..6a5a69f 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -42,6 +42,7 @@ import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.flume.Event;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.TrackingPolicy;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder;
@@ -145,6 +146,7 @@ public class TestReliableSpoolingFileEventReader {
         .spoolDirectory(WORK_DIR)
         .includePattern("^file2$")
         .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+        .sourceCounter(new SourceCounter("test"))
         .build();
 
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
@@ -166,6 +168,7 @@ public class TestReliableSpoolingFileEventReader {
             .spoolDirectory(WORK_DIR)
             .ignorePattern("^file2$")
             .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+            .sourceCounter(new SourceCounter("test"))
             .build();
 
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
@@ -195,6 +198,7 @@ public class TestReliableSpoolingFileEventReader {
         .ignorePattern("^file[013]$")
         .includePattern("^file2$")
         .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+        .sourceCounter(new SourceCounter("test"))
         .build();
 
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
@@ -222,6 +226,7 @@ public class TestReliableSpoolingFileEventReader {
         .ignorePattern("^file2$")
         .includePattern("^file2$")
         .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+        .sourceCounter(new SourceCounter("test"))
         .build();
 
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
@@ -239,6 +244,7 @@ public class TestReliableSpoolingFileEventReader {
   public void testRepeatedCallsWithCommitAlways() throws IOException {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
 
     final int expectedLines = 1 + 1 + 2 + 3 + 1;
@@ -261,6 +267,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .trackerDirPath(trackerDirPath)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
 
     final int expectedLines = 1 + 1 + 2 + 3 + 1;
@@ -288,6 +295,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .deletePolicy(DeletePolicy.IMMEDIATE.name())
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
 
     List<File> before = listFiles(WORK_DIR);
@@ -311,6 +319,7 @@ public class TestReliableSpoolingFileEventReader {
   public void testNullConsumeOrder() throws IOException {
     new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                  .consumeOrder(null)
+                                                 .sourceCounter(new SourceCounter("test"))
                                                  .build();
   }
 
@@ -319,6 +328,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.RANDOM)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File fileName = new File(WORK_DIR, "new-file");
     FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n");
@@ -340,6 +350,7 @@ public class TestReliableSpoolingFileEventReader {
     final ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.RANDOM)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File fileName = new File(WORK_DIR, "new-file");
     FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n");
@@ -376,6 +387,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.OLDEST)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
@@ -403,6 +415,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.YOUNGEST)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
@@ -434,6 +447,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.OLDEST)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
@@ -463,6 +477,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.YOUNGEST)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
@@ -529,6 +544,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .trackerDirPath(trackerDirPath)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     final int expectedLines = 1;
     int seenLines = 0;
@@ -549,11 +565,12 @@ public class TestReliableSpoolingFileEventReader {
       dir = new File("target/test/work/" + this.getClass().getSimpleName() + "_large");
       Files.createParentDirs(new File(dir, "dummy"));
       ReliableEventReader reader =
-              new ReliableSpoolingFileEventReader.Builder().spoolDirectory(dir)
-                      .consumeOrder(order)
-                      .trackingPolicy(trackingPolicy.toString())
-                      .recursiveDirectorySearch(true)
-                      .build();
+          new ReliableSpoolingFileEventReader.Builder().spoolDirectory(dir)
+                                                       .consumeOrder(order)
+                                                       .trackingPolicy(trackingPolicy.toString())
+                                                       .recursiveDirectorySearch(true)
+                                                       .sourceCounter(new SourceCounter("test"))
+                                                       .build();
       Map<Long, List<String>> expected;
       if (comparator == null) {
         expected = new TreeMap<Long, List<String>>();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
index bc3aa82..3d43bf4 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.serialization.LineDeserializer;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
 import org.junit.After;
@@ -71,6 +72,7 @@ public class TestSpoolingFileLineReader {
           .spoolDirectory(tmpDir)
           .completedSuffix(completedSuffix)
           .deserializerContext(ctx)
+          .sourceCounter(new SourceCounter("dummy"))
           .build();
     } catch (IOException ioe) {
       throw Throwables.propagate(ioe);

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index fdb1772..8b6f493 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -25,6 +25,7 @@ import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -32,11 +33,13 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.api.RpcClient;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.source.AvroSource;
@@ -50,6 +53,8 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,6 +178,29 @@ public class TestAvroSink {
   }
 
   @Test
+  public void testChannelException() throws InterruptedException,
+          EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
+
+    Server server = createServer(new MockAvroServer());
+    server.start();
+    sink.start();
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Transaction transaction = Mockito.mock(BasicTransactionSemantics.class);
+    Mockito.when(channel.getTransaction()).thenReturn(transaction);
+    sink.setChannel(channel);
+
+    Sink.Status status = sink.process();
+
+    sink.stop();
+    server.close();
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getChannelReadFail());
+  }
+
+  @Test
   public void testTimeout() throws InterruptedException,
       EventDeliveryException, InstantiationException, IllegalAccessException {
     setUp();
@@ -226,6 +254,9 @@ public class TestAvroSink {
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
         LifecycleState.STOP_OR_ERROR, 5000));
     server.close();
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(2, sinkCounter.getEventWriteFail());
   }
 
   @Test
@@ -280,6 +311,10 @@ public class TestAvroSink {
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
         LifecycleState.STOP_OR_ERROR, 5000));
     server.close();
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(5, sinkCounter.getEventWriteFail());
+    Assert.assertEquals(4, sinkCounter.getConnectionFailedCount());
   }
 
   @Test
@@ -608,6 +643,9 @@ public class TestAvroSink {
     if (failed) {
       Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, that's wrong.");
     }
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getEventWriteFail());
   }
 
   @Test
@@ -664,6 +702,8 @@ public class TestAvroSink {
     if (failed) {
       Assert.fail("SSL-enabled sink successfully connected to a server with an untrusted certificate when it should have failed");
     }
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getEventWriteFail());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
index e4d2e2e..6b74e2d 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
@@ -20,17 +20,23 @@
 package org.apache.flume.sink;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.PseudoTxnMemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
-import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +71,7 @@ public class TestRollingFileSink {
   }
 
   @Test
-  public void testLifecycle() throws InterruptedException, LifecycleException {
+  public void testLifecycle() {
     Context context = new Context();
 
     context.put("sink.directory", tmpDir.getPath());
@@ -77,7 +83,7 @@ public class TestRollingFileSink {
   }
 
   @Test
-  public void testAppend() throws InterruptedException, LifecycleException,
+  public void testAppend() throws InterruptedException,
       EventDeliveryException, IOException {
 
     Context context = new Context();
@@ -86,46 +92,11 @@ public class TestRollingFileSink {
     context.put("sink.rollInterval", "1");
     context.put("sink.batchSize", "1");
 
-    Configurables.configure(sink, context);
-
-    Channel channel = new PseudoTxnMemoryChannel();
-    Configurables.configure(channel, context);
-
-    sink.setChannel(channel);
-    sink.start();
-
-    for (int i = 0; i < 10; i++) {
-      Event event = new SimpleEvent();
-
-      event.setBody(("Test event " + i).getBytes());
-
-      channel.put(event);
-      sink.process();
-
-      Thread.sleep(500);
-    }
-
-    sink.stop();
-
-    for (String file : sink.getDirectory().list()) {
-      BufferedReader reader =
-          new BufferedReader(new FileReader(new File(sink.getDirectory(), file)));
-
-      String lastLine = null;
-      String currentLine = null;
-
-      while ((currentLine = reader.readLine()) != null) {
-        lastLine = currentLine;
-      }
-
-      logger.debug("Produced file:{} lastLine:{}", file, lastLine);
-
-      reader.close();
-    }
+    doTest(context);
   }
 
   @Test
-  public void testAppend2() throws InterruptedException, LifecycleException,
+  public void testAppend2() throws InterruptedException,
       EventDeliveryException, IOException {
 
     Context context = new Context();
@@ -134,48 +105,12 @@ public class TestRollingFileSink {
     context.put("sink.rollInterval", "0");
     context.put("sink.batchSize", "1");
 
-
-    Configurables.configure(sink, context);
-
-    Channel channel = new PseudoTxnMemoryChannel();
-    Configurables.configure(channel, context);
-
-    sink.setChannel(channel);
-    sink.start();
-
-    for (int i = 0; i < 10; i++) {
-      Event event = new SimpleEvent();
-
-      event.setBody(("Test event " + i).getBytes());
-
-      channel.put(event);
-      sink.process();
-
-      Thread.sleep(500);
-    }
-
-    sink.stop();
-
-    for (String file : sink.getDirectory().list()) {
-      BufferedReader reader =
-          new BufferedReader(new FileReader(new File(sink.getDirectory(), file)));
-
-      String lastLine = null;
-      String currentLine = null;
-
-      while ((currentLine = reader.readLine()) != null) {
-        lastLine = currentLine;
-        logger.debug("Produced file:{} lastLine:{}", file, lastLine);
-      }
-
-
-      reader.close();
-    }
+    doTest(context);
   }
 
   @Test
   public void testAppend3()
-      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
+      throws InterruptedException, EventDeliveryException, IOException {
     File tmpDir = new File("target/tmpLog");
     tmpDir.mkdirs();
     cleanDirectory(tmpDir);
@@ -187,46 +122,12 @@ public class TestRollingFileSink {
     context.put("sink.pathManager.prefix", "test3-");
     context.put("sink.pathManager.extension", "txt");
 
-    Configurables.configure(sink, context);
-
-    Channel channel = new PseudoTxnMemoryChannel();
-    Configurables.configure(channel, context);
-
-    sink.setChannel(channel);
-    sink.start();
-
-    for (int i = 0; i < 10; i++) {
-      Event event = new SimpleEvent();
-
-      event.setBody(("Test event " + i).getBytes());
-
-      channel.put(event);
-      sink.process();
-
-      Thread.sleep(500);
-    }
-
-    sink.stop();
-
-    for (String file : sink.getDirectory().list()) {
-      BufferedReader reader =
-          new BufferedReader(new FileReader(new File(sink.getDirectory(), file)));
-
-      String lastLine = null;
-      String currentLine = null;
-
-      while ((currentLine = reader.readLine()) != null) {
-        lastLine = currentLine;
-        logger.debug("Produced file:{} lastLine:{}", file, lastLine);
-      }
-
-      reader.close();
-    }
+    doTest(context);
   }
 
   @Test
   public void testRollTime()
-      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
+      throws InterruptedException, EventDeliveryException, IOException {
     File tmpDir = new File("target/tempLog");
     tmpDir.mkdirs();
     cleanDirectory(tmpDir);
@@ -239,10 +140,45 @@ public class TestRollingFileSink {
     context.put("sink.pathManager.prefix", "test4-");
     context.put("sink.pathManager.extension", "txt");
 
+    doTest(context);
+  }
+
+  @Test
+  public void testChannelException() throws InterruptedException, IOException {
+
+    Context context = new Context();
+
+    context.put("sink.directory", tmpDir.getPath());
+    context.put("sink.rollInterval", "0");
+    context.put("sink.batchSize", "1");
+
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Transaction transaction = Mockito.mock(BasicTransactionSemantics.class);
+    Mockito.when(channel.getTransaction()).thenReturn(transaction);
+
+    try {
+      doTest(context, channel);
+    } catch (EventDeliveryException e) {
+      //
+    }
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getChannelReadFail());
+  }
+
+  private void doTest(Context context) throws EventDeliveryException, InterruptedException,
+      IOException {
+    doTest(context, null);
+  }
+
+  private void doTest(Context context, Channel channel) throws EventDeliveryException,
+      InterruptedException, IOException {
     Configurables.configure(sink, context);
 
-    Channel channel = new PseudoTxnMemoryChannel();
-    Configurables.configure(channel, context);
+    if (channel == null) {
+      channel = new PseudoTxnMemoryChannel();
+      Configurables.configure(channel, context);
+    }
 
     sink.setChannel(channel);
     sink.start();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index fcfc72c..6f784ea 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.security.cert.X509Certificate;
 import java.nio.channels.ServerSocketChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -41,6 +42,7 @@ import javax.net.ssl.X509TrustManager;
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -50,6 +52,7 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.source.avro.AvroFlumeEvent;
@@ -64,9 +67,15 @@ import org.jboss.netty.handler.ssl.SslHandler;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.doThrow;
+
 public class TestAvroSource {
 
   private static final Logger logger = LoggerFactory
@@ -578,4 +587,26 @@ public class TestAvroSource {
     Assert.assertEquals("Server is stopped", LifecycleState.STOP,
         source.getLifecycleState());
   }
+
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    Context context = new Context();
+    context.put("port", String.valueOf(selectedPort = getFreePort()));
+    context.put("bind", "0.0.0.0");
+    source.configure(context);
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).when(cp).processEvent(any(Event.class));
+    doThrow(new ChannelException("dummy")).when(cp).processEventBatch(anyListOf(Event.class));
+    source.setChannelProcessor(cp);
+    source.start();
+    AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+    avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
+    avroEvent.setBody(ByteBuffer.wrap("Hello avro ssl".getBytes()));
+    source.append(avroEvent);
+    source.appendBatch(Arrays.asList(avroEvent));
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(2, sc.getChannelWriteFail());
+    source.stop();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
index 9a6c5f4..8155a12 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -57,6 +58,8 @@ import org.apache.mina.transport.socket.nio.NioSession;
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import static org.mockito.Mockito.*;
 
@@ -85,30 +88,28 @@ public class TestMultiportSyslogTCPSource {
     return msg1.getBytes();
   }
 
-  /**
-   * Basic test to exercise multiple-port parsing.
-   */
-  @Test
-  public void testMultiplePorts() throws IOException, ParseException {
-    MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
-    Channel channel = new MemoryChannel();
-
+  private List<Integer> testNPorts(MultiportSyslogTCPSource source, Channel channel,
+                                   List<Event> channelEvents, int numPorts,
+                                   ChannelProcessor channelProcessor) throws IOException {
     Context channelContext = new Context();
     channelContext.put("capacity", String.valueOf(2000));
     channelContext.put("transactionCapacity", String.valueOf(2000));
     Configurables.configure(channel, channelContext);
 
-    List<Channel> channels = Lists.newArrayList();
-    channels.add(channel);
+    if (channelProcessor == null) {
+      List<Channel> channels = Lists.newArrayList();
+      channels.add(channel);
 
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
+      ChannelSelector rcs = new ReplicatingChannelSelector();
+      rcs.setChannels(channels);
 
-    source.setChannelProcessor(new ChannelProcessor(rcs));
-    Context context = new Context();
+      source.setChannelProcessor(new ChannelProcessor(rcs));
+    } else {
+      source.setChannelProcessor(channelProcessor);
+    }
 
-    List<Integer> portList = new ArrayList<>(1000);
-    while (portList.size() < 1000) {
+    List<Integer> portList = new ArrayList<>(numPorts);
+    while (portList.size() < numPorts) {
       int port = getFreePort();
       if (!portList.contains(port)) {
         portList.add(port);
@@ -116,26 +117,26 @@ public class TestMultiportSyslogTCPSource {
     }
 
     StringBuilder ports = new StringBuilder();
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < numPorts; i++) {
       ports.append(String.valueOf(portList.get(i))).append(" ");
     }
+    Context context = new Context();
     context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS,
         ports.toString().trim());
     source.configure(context);
     source.start();
 
     Socket syslogSocket;
-    for (int i = 0; i < 1000 ; i++) {
+    for (int i = 0; i < numPorts; i++) {
       syslogSocket = new Socket(
-              InetAddress.getLocalHost(), portList.get(i));
+          InetAddress.getLocalHost(), portList.get(i));
       syslogSocket.getOutputStream().write(getEvent(i));
       syslogSocket.close();
     }
 
-    List<Event> channelEvents = new ArrayList<Event>();
     Transaction txn = channel.getTransaction();
     txn.begin();
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < numPorts; i++) {
       Event e = channel.take();
       if (e == null) {
         throw new NullPointerException("Event is null");
@@ -149,8 +150,26 @@ public class TestMultiportSyslogTCPSource {
     } finally {
       txn.close();
     }
+
+
+    return portList;
+  }
+
+  /**
+   * Basic test to exercise multiple-port parsing.
+   */
+  @Test
+  public void testMultiplePorts() throws IOException, ParseException {
+    MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
+    Channel channel = new MemoryChannel();
+    List<Event> channelEvents = new ArrayList<>();
+    int numPorts = 1000;
+
+    List<Integer> portList = testNPorts(source, channel, channelEvents,
+        numPorts, null);
+
     //Since events can arrive out of order, search for each event in the array
-    for (int i = 0; i < 1000 ; i++) {
+    for (int i = 0; i < numPorts ; i++) {
       Iterator<Event> iter = channelEvents.iterator();
       while (iter.hasNext()) {
         Event e = iter.next();
@@ -284,6 +303,24 @@ public class TestMultiportSyslogTCPSource {
     Assert.assertArrayEquals("Raw message data should be kept in body of event",
         badUtf8Seq, evt.getBody());
 
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(handler, "sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
+
+  }
+
+  @Test
+  public void testHandlerGenericFail() throws Exception {
+    // defaults to UTF-8
+    MultiportSyslogHandler handler = new MultiportSyslogHandler(
+        1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()),
+        new SourceCounter("test"), "port",
+        new ThreadSafeDecoder(Charsets.UTF_8),
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),
+        null);
+
+    handler.exceptionCaught(null, new RuntimeException("dummy"));
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(handler, "sourceCounter");
+    Assert.assertEquals(1, sc.getGenericProcessingFail());
   }
 
   // helper function
@@ -391,6 +428,27 @@ public class TestMultiportSyslogTCPSource {
     evt = takeEvent(chan);
     Assert.assertNotNull("Event vanished!", evt);
     Assert.assertNull(evt.getHeaders().get(SyslogUtils.EVENT_STATUS));
+
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(handler, "sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
+  }
+
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
+    Channel channel = new MemoryChannel();
+    List<Event> channelEvents = new ArrayList<>();
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).doNothing().when(cp)
+        .processEventBatch(anyListOf(Event.class));
+    try {
+      testNPorts(source, channel, channelEvents, 1, cp);
+    } catch (Exception e) {
+      //
+    }
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getChannelWriteFail());
+    source.stop();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 92a698d..7c671a6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -21,6 +21,8 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelFullException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -28,13 +30,17 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -497,4 +503,68 @@ public class TestSpoolDirectorySource {
     txn.close();
     source.stop();
   }
+
+
+  private SourceCounter errorCounterCommonInit() {
+    SourceCounter sc = new SourceCounter("dummy");
+    sc.start();
+    Context context = new Context();
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+        tmpDir.getAbsolutePath());
+    Configurables.configure(source, context);
+    return sc;
+  }
+
+  @Test
+  public void testErrorCounters() throws Exception {
+    SourceCounter sc = errorCounterCommonInit();
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    Mockito.doThrow(new ChannelException("dummy"))
+        .doThrow(new ChannelFullException("dummy"))
+        .doThrow(new RuntimeException("runtime"))
+        .when(cp).processEventBatch(Matchers.anyListOf(Event.class));
+    source.setChannelProcessor(cp);
+
+    ReliableSpoolingFileEventReader reader = Mockito.mock(ReliableSpoolingFileEventReader.class);
+    List<Event> events = new ArrayList<>();
+    events.add(Mockito.mock(Event.class));
+    Mockito.doReturn(events)
+        .doReturn(events)
+        .doReturn(events)
+        .doThrow(new IOException("dummy"))
+        .when(reader).readEvents(Mockito.anyInt());
+
+    Runnable runner = source. new SpoolDirectoryRunnable(reader, sc);
+    try {
+      runner.run();
+    } catch (Exception ex) {
+      //Expected
+    }
+    Assert.assertEquals(2, sc.getChannelWriteFail());
+    Assert.assertEquals(1, sc.getGenericProcessingFail());
+  }
+
+  @Test
+  public void testErrorCounterEventReadFail() throws Exception {
+    SourceCounter sc = errorCounterCommonInit();
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    source.setChannelProcessor(cp);
+
+    ReliableSpoolingFileEventReader reader = Mockito.mock(ReliableSpoolingFileEventReader.class);
+    List<Event> events = new ArrayList<>();
+    events.add(Mockito.mock(Event.class));
+    Mockito.doReturn(events)
+        .doThrow(new IOException("dummy"))
+        .when(reader).readEvents(Mockito.anyInt());
+
+    Runnable runner = source. new SpoolDirectoryRunnable(reader, sc);
+    try {
+      runner.run();
+    } catch (Exception ex) {
+      //Expected
+    }
+    Assert.assertEquals(1, sc.getEventReadFail());
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
index f07acc6..fbacdec 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
@@ -20,6 +20,7 @@ package org.apache.flume.source;
 
 import com.google.common.base.Charsets;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -31,6 +32,7 @@ import org.apache.flume.conf.Configurables;
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
@@ -40,6 +42,9 @@ import java.net.Socket;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+
 public class TestSyslogTcpSource {
   private static final org.slf4j.Logger logger =
       LoggerFactory.getLogger(TestSyslogTcpSource.class);
@@ -159,5 +164,43 @@ public class TestSyslogTcpSource {
     Assert.assertEquals(10, source.getSourceCounter().getEventAcceptedCount());
     Assert.assertEquals(10, source.getSourceCounter().getEventReceivedCount());
   }
+
+  @Test
+  public void testSourceCounterChannelFail() throws Exception {
+    init("true");
+
+    errorCounterCommon(new ChannelException("dummy"));
+
+    for (int i = 0; i < 10 && source.getSourceCounter().getChannelWriteFail() == 0; i++) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, source.getSourceCounter().getChannelWriteFail());
+  }
+
+  @Test
+  public void testSourceCounterEventFail() throws Exception {
+    init("true");
+
+    errorCounterCommon(new RuntimeException("dummy"));
+
+    for (int i = 0; i < 10 && source.getSourceCounter().getEventReadFail() == 0; i++) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, source.getSourceCounter().getEventReadFail());
+  }
+
+  private void errorCounterCommon(Exception e) throws IOException {
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(e).when(cp).processEvent(any(Event.class));
+    source.setChannelProcessor(cp);
+
+    source.start();
+    // Write some message to the syslog port
+    InetSocketAddress addr = source.getBoundAddress();
+    try (Socket syslogSocket = new Socket(addr.getAddress(), addr.getPort())) {
+      syslogSocket.getOutputStream().write(bodyWithTandH.getBytes());
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
index cb3860d..06122e6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
@@ -20,6 +20,7 @@ package org.apache.flume.source;
 
 import com.google.common.base.Charsets;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -31,6 +32,7 @@ import org.apache.flume.conf.Configurables;
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
@@ -43,6 +45,9 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+
 public class TestSyslogUdpSource {
   private static final org.slf4j.Logger logger =
       LoggerFactory.getLogger(TestSyslogUdpSource.class);
@@ -185,15 +190,7 @@ public class TestSyslogUdpSource {
   public void testSourceCounter() throws Exception {
     init("true");
 
-    source.start();
-    DatagramPacket datagramPacket = createDatagramPacket("test".getBytes());
-    sendDatagramPacket(datagramPacket);
-
-    Transaction txn = channel.getTransaction();
-    txn.begin();
-
-    channel.take();
-    commitAndCloseTransaction(txn);
+    doCounterCommon();
 
     // Retrying up to 10 times while the acceptedCount == 0 because the event processing in
     // SyslogUDPSource is handled on a separate thread by Netty so message delivery,
@@ -201,11 +198,47 @@ public class TestSyslogUdpSource {
     for (int i = 0; i < 10 && source.getSourceCounter().getEventAcceptedCount() == 0; i++) {
       Thread.sleep(100);
     }
-
     Assert.assertEquals(1, source.getSourceCounter().getEventAcceptedCount());
     Assert.assertEquals(1, source.getSourceCounter().getEventReceivedCount());
   }
 
+  private void doCounterCommon() throws IOException, InterruptedException {
+    source.start();
+    DatagramPacket datagramPacket = createDatagramPacket("test".getBytes());
+    sendDatagramPacket(datagramPacket);
+  }
+
+  @Test
+  public void testSourceCounterChannelFail() throws Exception {
+    init("true");
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).when(cp).processEvent(any(Event.class));
+    source.setChannelProcessor(cp);
+
+    doCounterCommon();
+
+    for (int i = 0; i < 10 && source.getSourceCounter().getChannelWriteFail() == 0; i++) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, source.getSourceCounter().getChannelWriteFail());
+  }
+
+  @Test
+  public void testSourceCounterReadFail() throws Exception {
+    init("true");
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new RuntimeException("dummy")).when(cp).processEvent(any(Event.class));
+    source.setChannelProcessor(cp);
+
+    doCounterCommon();
+    for (int i = 0; i < 10 && source.getSourceCounter().getEventReadFail() == 0; i++) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, source.getSourceCounter().getEventReadFail());
+  }
+
   private DatagramPacket createDatagramPacket(byte[] payload) {
     InetSocketAddress addr = source.getBoundAddress();
     return new DatagramPacket(payload, payload.length, addr.getAddress(), addr.getPort());

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
index f7c6361..d594276 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
@@ -21,6 +21,7 @@ package org.apache.flume.source;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -36,10 +37,13 @@ import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 
+import org.apache.flume.instrumentation.SourceCounter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.KeyManagerFactory;
@@ -47,6 +51,7 @@ import javax.net.ssl.KeyManagerFactory;
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +60,10 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.doThrow;
+
 public class TestThriftSource {
 
   private ThriftSource source;
@@ -290,6 +299,34 @@ public class TestThriftSource {
     }
   }
 
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    client = RpcClientFactory.getThriftInstance(props);
+    Context context = new Context();
+    context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+    context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+    source.configure(context);
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).when(cp).processEvent(any(Event.class));
+    doThrow(new ChannelException("dummy")).when(cp).processEventBatch(anyListOf(Event.class));
+    source.setChannelProcessor(cp);
+    source.start();
+    Event event = EventBuilder.withBody("hello".getBytes());
+    try {
+      client.append(event);
+    } catch (EventDeliveryException e) {
+      //
+    }
+    try {
+      client.appendBatch(Arrays.asList(event));
+    } catch (EventDeliveryException e) {
+      //
+    }
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(2, sc.getChannelWriteFail());
+    source.stop();
+  }
+
   private class SubmitHelper implements Runnable {
     private final int i;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
index bb61dce..04eec24 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
@@ -22,6 +22,7 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import junit.framework.Assert;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -31,6 +32,7 @@ import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.JSONEvent;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpOptions;
 import org.apache.http.client.methods.HttpPost;
@@ -44,6 +46,8 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -78,6 +82,8 @@ import java.util.Random;
 import java.util.Set;
 
 import static org.fest.reflect.core.Reflection.field;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.doThrow;
 
 /**
  *
@@ -204,7 +210,6 @@ public class TestHTTPSource {
     doTestForbidden(new HttpOptions("http://0.0.0.0:" + selectedPort));
   }
 
-
   private void doTestForbidden(HttpRequestBase request) throws Exception {
     HttpResponse response = httpClient.execute(request);
     Assert.assertEquals(HttpServletResponse.SC_FORBIDDEN,
@@ -248,6 +253,8 @@ public class TestHTTPSource {
 
     Assert.assertEquals(HttpServletResponse.SC_BAD_REQUEST,
             response.getStatusLine().getStatusCode());
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
 
   }
 
@@ -265,6 +272,19 @@ public class TestHTTPSource {
   public void testBigBatchDeserializarionUTF32() throws Exception {
     testBatchWithVariousEncoding("UTF-32");
   }
+
+  @Test
+  public void testCounterGenericFail() throws Exception {
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new RuntimeException("dummy")).when(cp).processEventBatch(anyListOf(Event.class));
+    ChannelProcessor oldCp = source.getChannelProcessor();
+    source.setChannelProcessor(cp);
+    testBatchWithVariousEncoding("UTF-8");
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getGenericProcessingFail());
+    source.setChannelProcessor(oldCp);
+  }
+
   @Test
   public void testSingleEvent() throws Exception {
     StringEntity input = new StringEntity("[{\"headers\" : {\"a\": \"b\"},\"body\":"
@@ -370,6 +390,8 @@ public class TestHTTPSource {
     HttpResponse response = putWithEncoding("UTF-8", 150).response;
     Assert.assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
             response.getStatusLine().getStatusCode());
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getChannelWriteFail());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 40f2f4a..22306a0 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -441,10 +441,12 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     } catch (IOException eIO) {
       transaction.rollback();
       LOG.warn("HDFS IO error", eIO);
+      sinkCounter.incrementEventWriteFail();
       return Status.BACKOFF;
     } catch (Throwable th) {
       transaction.rollback();
       LOG.error("process failed", th);
+      sinkCounter.incrementEventWriteOrChannelFail(th);
       if (th instanceof Error) {
         throw (Error) th;
       } else {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index bbc0ba8..f86c96d 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -45,6 +45,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -52,10 +53,12 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink.Status;
 import org.apache.flume.SystemClock;
 import org.apache.flume.Transaction;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -75,6 +78,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -754,6 +758,8 @@ public class TestHDFSEventSink {
     sink.stop();
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
 
+    SinkCounter sc = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sc.getEventWriteFail());
   }
 
 
@@ -930,6 +936,9 @@ public class TestHDFSEventSink {
     sink.stop();
 
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
+
+    SinkCounter sc = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sc.getEventWriteFail());
   }
 
   /**
@@ -1170,6 +1179,9 @@ public class TestHDFSEventSink {
     }
 
     sink.stop();
+
+    SinkCounter sc = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(2, sc.getEventWriteFail());
   }
 
   /*
@@ -1625,4 +1637,29 @@ public class TestHDFSEventSink {
 
     sink.stop();
   }
+
+  @Test
+  public void testChannelException() {
+    LOG.debug("Starting...");
+    Context context = new Context();
+    context.put("hdfs.path", testPath);
+    context.put("keep-alive", "0");
+    Configurables.configure(sink, context);
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Mockito.when(channel.getTransaction())
+        .thenReturn(Mockito.mock(BasicTransactionSemantics.class));
+    sink.setChannel(channel);
+    sink.start();
+    try {
+      sink.process();
+    } catch (EventDeliveryException e) {
+      //
+    }
+    sink.stop();
+
+    SinkCounter sc = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sc.getChannelReadFail());
+  }
+
 }