You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/08/28 11:26:00 UTC

[2/2] flume git commit: FLUME-3050 add counters for error conditions and expose to monitor URL

FLUME-3050 add counters for error conditions and expose to monitor URL

By introducing error counters it will be easier to monitor problems.
Also errors are categorized, hopefully this will help setting up better
monitoring solutions.

Concept: an error is when an Exception is thrown or an ERROR level log is
written during event processing. In case of an error at least 1 error counter
is increased at least once. (Preferably 1 counter once).
Errors during event processing are counted.
Initialization errors are not handled here.
3 types of errors are differentiated.
- Channel read/write errors from the channel when the channel
  throws a ChannelException.
- Event read/write errors. E.g: A source cannot read an event due to
- Generic errors - e.g.: TaildirSource cannot write position file.

This closes #222

Reviewers: Peter Turcsanyi, Ferenc Szabo

(Endre Major via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3a22cd4d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3a22cd4d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3a22cd4d

Branch: refs/heads/trunk
Commit: 3a22cd4d8bc47f0e7c30bba93186ad0cf602c07e
Parents: 368776f
Author: Endre Major <em...@cloudera.com>
Authored: Tue Aug 28 13:25:08 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Tue Aug 28 13:25:08 2018 +0200

----------------------------------------------------------------------
 .../apache/flume/client/avro/AvroCLIClient.java |   5 +-
 .../avro/ReliableSpoolingFileEventReader.java   |  20 ++-
 .../flume/instrumentation/SinkCounter.java      |  36 +++-
 .../flume/instrumentation/SinkCounterMBean.java |   5 +
 .../flume/instrumentation/SourceCounter.java    |  50 +++++-
 .../instrumentation/SourceCounterMBean.java     |   7 +
 .../org/apache/flume/sink/AbstractRpcSink.java  |   2 +
 .../org/apache/flume/sink/RollingFileSink.java  |   1 +
 .../org/apache/flume/source/AvroSource.java     |   2 +
 .../flume/source/MultiportSyslogTCPSource.java  |   4 +
 .../flume/source/SequenceGeneratorSource.java   |   1 +
 .../flume/source/SpoolDirectorySource.java      |  14 +-
 .../apache/flume/source/SyslogTcpSource.java    |   2 +
 .../apache/flume/source/SyslogUDPSource.java    |   2 +
 .../org/apache/flume/source/ThriftSource.java   |   2 +
 .../apache/flume/source/http/HTTPSource.java    |   4 +
 .../TestReliableSpoolingFileEventReader.java    |  27 ++-
 .../client/avro/TestSpoolingFileLineReader.java |   2 +
 .../org/apache/flume/sink/TestAvroSink.java     |  40 +++++
 .../apache/flume/sink/TestRollingFileSink.java  | 168 ++++++-------------
 .../org/apache/flume/source/TestAvroSource.java |  31 ++++
 .../source/TestMultiportSyslogTCPSource.java    | 102 ++++++++---
 .../flume/source/TestSpoolDirectorySource.java  |  70 ++++++++
 .../flume/source/TestSyslogTcpSource.java       |  43 +++++
 .../flume/source/TestSyslogUdpSource.java       |  53 ++++--
 .../apache/flume/source/TestThriftSource.java   |  37 ++++
 .../flume/source/http/TestHTTPSource.java       |  24 ++-
 .../apache/flume/sink/hdfs/HDFSEventSink.java   |   2 +
 .../flume/sink/hdfs/TestHDFSEventSink.java      |  37 ++++
 flume-ng-sinks/flume-hive-sink/pom.xml          |   6 +
 .../org/apache/flume/sink/hive/HiveSink.java    |   1 +
 .../apache/flume/sink/hive/TestHiveSink.java    |  87 +++++-----
 .../org/apache/flume/sink/http/HttpSink.java    |   2 +
 .../apache/flume/sink/http/TestHttpSink.java    |  16 ++
 .../apache/flume/sink/hbase2/HBase2Sink.java    |   1 +
 .../flume/sink/hbase2/TestHBase2Sink.java       |   6 +
 flume-ng-sinks/flume-ng-kafka-sink/pom.xml      |   6 +
 .../org/apache/flume/sink/kafka/KafkaSink.java  |   1 +
 .../apache/flume/sink/kafka/TestKafkaSink.java  |  19 ++-
 .../flume-ng-morphline-solr-sink/pom.xml        |   5 +
 .../sink/solr/morphline/MorphlineSink.java      |   1 +
 .../solr/morphline/TestMorphlineSolrSink.java   |  19 +++
 .../org/apache/flume/source/jms/JMSSource.java  |   3 +
 .../apache/flume/source/jms/TestJMSSource.java  |  25 +++
 .../apache/flume/source/kafka/KafkaSource.java  |   1 +
 .../flume/source/kafka/TestKafkaSource.java     |  34 ++++
 flume-ng-sources/flume-scribe-source/pom.xml    |   6 +
 .../flume/source/scribe/ScribeSource.java       |   1 +
 .../flume/source/scribe/TestScribeSource.java   |  32 +++-
 flume-ng-sources/flume-taildir-source/pom.xml   |   7 +
 .../flume/source/taildir/TaildirSource.java     |   5 +
 .../flume/source/taildir/TestTaildirSource.java |  92 ++++++++--
 52 files changed, 940 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
index dd2aeef..242c821 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
@@ -43,6 +43,7 @@ import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -203,7 +204,9 @@ public class AvroCLIClient {
         reader = new SimpleTextLineEventReader(new FileReader(new File(fileName)));
       } else if (dirName != null) {
         reader = new ReliableSpoolingFileEventReader.Builder()
-            .spoolDirectory(new File(dirName)).build();
+            .spoolDirectory(new File(dirName))
+            .sourceCounter(new SourceCounter("avrocli"))
+            .build();
       } else {
         reader = new SimpleTextLineEventReader(new InputStreamReader(System.in));
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index 830d21b..e8d5a36 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -29,6 +29,7 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.serialization.DecodeErrorPolicy;
 import org.apache.flume.serialization.DurablePositionTracker;
 import org.apache.flume.serialization.EventDeserializer;
@@ -59,6 +60,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Pattern;
 
 /**
@@ -110,6 +112,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
   private final DecodeErrorPolicy decodeErrorPolicy;
   private final ConsumeOrder consumeOrder;
   private final boolean recursiveDirectorySearch;
+  private final SourceCounter sourceCounter;
 
   private Optional<FileInfo> currentFile = Optional.absent();
   /** Always contains the last file from which lines have been read. */
@@ -133,8 +136,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       String deserializerType, Context deserializerContext,
       String deletePolicy, String trackingPolicy, String inputCharset,
       DecodeErrorPolicy decodeErrorPolicy,
-      ConsumeOrder consumeOrder,
-      boolean recursiveDirectorySearch) throws IOException {
+      ConsumeOrder consumeOrder, boolean recursiveDirectorySearch,
+                                          SourceCounter sourceCounter) throws IOException {
 
     // Sanity checks
     Preconditions.checkNotNull(spoolDirectory);
@@ -147,6 +150,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     Preconditions.checkNotNull(deletePolicy);
     Preconditions.checkNotNull(trackingPolicy);
     Preconditions.checkNotNull(inputCharset);
+    Preconditions.checkNotNull(sourceCounter);
 
     // validate delete policy
     if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) &&
@@ -209,6 +213,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);
     this.consumeOrder = Preconditions.checkNotNull(consumeOrder);
     this.recursiveDirectorySearch = recursiveDirectorySearch;
+    this.sourceCounter = sourceCounter;
 
     trackerDirectory = new File(trackerDirPath);
 
@@ -287,6 +292,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     } catch (IOException e) {
       logger.error("I/O exception occurred while listing directories. " +
                    "Files already matched will be returned. " + directory, e);
+      sourceCounter.incrementGenericProcessingFail();
     }
 
     return candidateFiles;
@@ -508,6 +514,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
         if (!deleted) {
           logger.error("Unable to delete file " + fileToRoll.getAbsolutePath() +
               ". It will likely be ingested another time.");
+          sourceCounter.incrementGenericProcessingFail();
         }
       } else {
         String message = "File name has been re-used with different" +
@@ -681,6 +688,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       return Optional.absent();
     } catch (IOException e) {
       logger.error("Exception opening file: " + file, e);
+      sourceCounter.incrementGenericProcessingFail();
       return Optional.absent();
     }
   }
@@ -777,6 +785,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
         SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER;
     private boolean recursiveDirectorySearch =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_RECURSIVE_DIRECTORY_SEARCH;
+    private SourceCounter sourceCounter;
 
     public Builder spoolDirectory(File directory) {
       this.spoolDirectory = directory;
@@ -863,12 +872,17 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       return this;
     }
 
+    public Builder sourceCounter(SourceCounter sourceCounter) {
+      this.sourceCounter = sourceCounter;
+      return this;
+    }
+
     public ReliableSpoolingFileEventReader build() throws IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
           includePattern, ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
           annotateBaseName, baseNameHeader, deserializerType,
           deserializerContext, deletePolicy, trackingPolicy, inputCharset, decodeErrorPolicy,
-          consumeOrder, recursiveDirectorySearch);
+          consumeOrder, recursiveDirectorySearch, sourceCounter);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
index 534adc8..8c77959 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
@@ -18,6 +18,7 @@
 package org.apache.flume.instrumentation;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.flume.ChannelException;
 
 public class SinkCounter extends MonitoredCounterGroup implements
     SinkCounterMBean {
@@ -46,11 +47,18 @@ public class SinkCounter extends MonitoredCounterGroup implements
   private static final String COUNTER_EVENT_DRAIN_SUCCESS =
       "sink.event.drain.sucess";
 
+  private static final String COUNTER_EVENT_WRITE_FAIL =
+      "sink.event.write.fail";
+
+  private static final String COUNTER_CHANNEL_READ_FAIL =
+      "sink.channel.read.fail";
+
   private static final String[] ATTRIBUTES = {
     COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
     COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
     COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
-    COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
+    COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS,
+    COUNTER_EVENT_WRITE_FAIL, COUNTER_CHANNEL_READ_FAIL
   };
 
   public SinkCounter(String name) {
@@ -141,4 +149,30 @@ public class SinkCounter extends MonitoredCounterGroup implements
   public long addToEventDrainSuccessCount(long delta) {
     return addAndGet(COUNTER_EVENT_DRAIN_SUCCESS, delta);
   }
+
+  public long incrementEventWriteFail() {
+    return increment(COUNTER_EVENT_WRITE_FAIL);
+  }
+
+  @Override
+  public long getEventWriteFail() {
+    return get(COUNTER_EVENT_WRITE_FAIL);
+  }
+
+  public long incrementChannelReadFail() {
+    return increment(COUNTER_CHANNEL_READ_FAIL);
+  }
+
+  @Override
+  public long getChannelReadFail() {
+    return get(COUNTER_CHANNEL_READ_FAIL);
+  }
+
+  public long incrementEventWriteOrChannelFail(Throwable t) {
+    if (t instanceof ChannelException) {
+      return incrementChannelReadFail();
+    }
+    return incrementEventWriteFail();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
index 472a4dd..0ca265a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
@@ -48,4 +48,9 @@ public interface SinkCounterMBean {
   long getStopTime();
 
   String getType();
+
+  long getEventWriteFail();
+
+  long getChannelReadFail();
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
index f96694e..0e4d971 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
@@ -19,6 +19,7 @@
 package org.apache.flume.instrumentation;
 
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.flume.ChannelException;
 
 public class SourceCounter extends MonitoredCounterGroup implements
     SourceCounterMBean {
@@ -37,15 +38,25 @@ public class SourceCounter extends MonitoredCounterGroup implements
       "src.append-batch.received";
   private static final String COUNTER_APPEND_BATCH_ACCEPTED =
       "src.append-batch.accepted";
-  
+
   private static final String COUNTER_OPEN_CONNECTION_COUNT =
           "src.open-connection.count";
 
+  private static final String COUNTER_EVENT_READ_FAIL =
+      "src.event.read.fail";
+
+  private static final String COUNTER_GENERIC_PROCESSING_FAIL =
+      "src.generic.processing.fail";
+
+  private static final String COUNTER_CHANNEL_WRITE_FAIL =
+      "src.channel.write.fail";
+
   private static final String[] ATTRIBUTES = {
     COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED,
     COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED,
     COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED,
-    COUNTER_OPEN_CONNECTION_COUNT
+    COUNTER_OPEN_CONNECTION_COUNT, COUNTER_EVENT_READ_FAIL,
+    COUNTER_CHANNEL_WRITE_FAIL, COUNTER_GENERIC_PROCESSING_FAIL
   };
 
   public SourceCounter(String name) {
@@ -126,4 +137,39 @@ public class SourceCounter extends MonitoredCounterGroup implements
   public void setOpenConnectionCount(long openConnectionCount) {
     set(COUNTER_OPEN_CONNECTION_COUNT, openConnectionCount);
   }
+
+  public long incrementEventReadFail() {
+    return increment(COUNTER_EVENT_READ_FAIL);
+  }
+
+  @Override
+  public long getEventReadFail() {
+    return get(COUNTER_EVENT_READ_FAIL);
+  }
+
+  public long incrementChannelWriteFail() {
+    return increment(COUNTER_CHANNEL_WRITE_FAIL);
+  }
+
+  @Override
+  public long getChannelWriteFail() {
+    return get(COUNTER_CHANNEL_WRITE_FAIL);
+  }
+
+  public long incrementGenericProcessingFail() {
+    return increment(COUNTER_GENERIC_PROCESSING_FAIL);
+  }
+
+  @Override
+  public long getGenericProcessingFail() {
+    return get(COUNTER_GENERIC_PROCESSING_FAIL);
+  }
+
+  public long incrementEventReadOrChannelFail(Throwable t) {
+    if (t instanceof ChannelException) {
+      return incrementChannelWriteFail();
+    }
+    return incrementEventReadFail();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
index 5ccbed4..bfbfe3e 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
@@ -45,4 +45,11 @@ public interface SourceCounterMBean {
   String getType();
 
   long getOpenConnectionCount();
+
+  long getEventReadFail();
+
+  long getChannelWriteFail();
+
+  long getGenericProcessingFail();
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index f6024aa..5a5993a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -383,8 +383,10 @@ public abstract class AbstractRpcSink extends AbstractSink implements Configurab
       } else if (t instanceof ChannelException) {
         logger.error("Rpc Sink " + getName() + ": Unable to get event from" +
             " channel " + channel.getName() + ". Exception follows.", t);
+        sinkCounter.incrementChannelReadFail();
         status = Status.BACKOFF;
       } else {
+        sinkCounter.incrementEventWriteFail();
         destroyConnection();
         throw new EventDeliveryException("Failed to send events", t);
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
index ee4b947..9b0827a 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
@@ -220,6 +220,7 @@ public class RollingFileSink extends AbstractSink implements Configurable {
       transaction.commit();
       sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
     } catch (Exception ex) {
+      sinkCounter.incrementEventWriteOrChannelFail(ex);
       transaction.rollback();
       throw new EventDeliveryException("Failed to process transaction", ex);
     } finally {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 623e61e..a105bbe 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -363,6 +363,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     } catch (ChannelException ex) {
       logger.warn("Avro source " + getName() + ": Unable to process event. " +
           "Exception follows.", ex);
+      sourceCounter.incrementChannelWriteFail();
       return Status.FAILED;
     }
 
@@ -393,6 +394,7 @@ public class AvroSource extends AbstractSource implements EventDrivenSource,
     } catch (Throwable t) {
       logger.error("Avro source " + getName() + ": Unable to process event " +
           "batch. Exception follows.", t);
+      sourceCounter.incrementChannelWriteFail();
       if (t instanceof Error) {
         throw (Error) t;
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
index 201d8e7..3c59b47 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/MultiportSyslogTCPSource.java
@@ -243,6 +243,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
     public void exceptionCaught(IoSession session, Throwable cause)
         throws Exception {
       logger.error("Error in syslog message handler", cause);
+      sourceCounter.incrementGenericProcessingFail();
       if (cause instanceof Error) {
         Throwables.propagate(cause);
       }
@@ -320,6 +321,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
           sourceCounter.addToEventAcceptedCount(numEvents);
         } catch (Throwable t) {
           logger.error("Error writing to channel, event dropped", t);
+          sourceCounter.incrementEventReadOrChannelFail(t);
           if (t instanceof Error) {
             Throwables.propagate(t);
           }
@@ -341,6 +343,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
       } catch (Throwable t) {
         logger.info("Error decoding line with charset (" + decoder.charset() +
             "). Exception follows.", t);
+        sourceCounter.incrementEventReadFail();
 
         if (t instanceof Error) {
           Throwables.propagate(t);
@@ -377,6 +380,7 @@ public class MultiportSyslogTCPSource extends AbstractSource implements
         event.getHeaders().put(SyslogUtils.EVENT_STATUS,
             SyslogUtils.SyslogStatus.INVALID.getSyslogStatus());
         logger.debug("Error parsing syslog event", ex);
+        sourceCounter.incrementEventReadFail();
       }
 
       return event;

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
index eaa9ef3..e494bfb 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java
@@ -93,6 +93,7 @@ public class SequenceGeneratorSource extends AbstractPollableSource implements
       eventsSent = eventsSentTX;
     } catch (ChannelException ex) {
       logger.error( getName() + " source could not write to channel.", ex);
+      sourceCounter.incrementChannelWriteFail();
     }
 
     return status;

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 68bbe7f..305ca3b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -106,6 +106,7 @@ public class SpoolDirectorySource extends AbstractSource
           .consumeOrder(consumeOrder)
           .recursiveDirectorySearch(recursiveDirectorySearch)
           .trackingPolicy(trackingPolicy)
+          .sourceCounter(sourceCounter)
           .build();
     } catch (IOException ioe) {
       throw new FlumeException("Error instantiating spooling event parser",
@@ -235,7 +236,8 @@ public class SpoolDirectorySource extends AbstractSource
     return recursiveDirectorySearch;
   }
 
-  private class SpoolDirectoryRunnable implements Runnable {
+  @VisibleForTesting
+  protected class SpoolDirectoryRunnable implements Runnable {
     private ReliableSpoolingFileEventReader reader;
     private SourceCounter sourceCounter;
 
@@ -248,9 +250,12 @@ public class SpoolDirectorySource extends AbstractSource
     @Override
     public void run() {
       int backoffInterval = 250;
+      boolean readingEvents = false;
       try {
         while (!Thread.interrupted()) {
+          readingEvents = true;
           List<Event> events = reader.readEvents(batchSize);
+          readingEvents = false;
           if (events.isEmpty()) {
             break;
           }
@@ -264,6 +269,7 @@ public class SpoolDirectorySource extends AbstractSource
             logger.warn("The channel is full, and cannot write data now. The " +
                 "source will try again after " + backoffInterval +
                 " milliseconds");
+            sourceCounter.incrementChannelWriteFail();
             hitChannelFullException = true;
             backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
             continue;
@@ -271,6 +277,7 @@ public class SpoolDirectorySource extends AbstractSource
             logger.warn("The channel threw an exception, and cannot write data now. The " +
                 "source will try again after " + backoffInterval +
                 " milliseconds");
+            sourceCounter.incrementChannelWriteFail();
             hitChannelException = true;
             backoffInterval = waitAndGetNewBackoffInterval(backoffInterval);
             continue;
@@ -283,6 +290,11 @@ public class SpoolDirectorySource extends AbstractSource
         logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " +
             "Uncaught exception in SpoolDirectorySource thread. " +
             "Restart or reconfigure Flume to continue processing.", t);
+        if (readingEvents) {
+          sourceCounter.incrementEventReadFail();
+        } else {
+          sourceCounter.incrementGenericProcessingFail();
+        }
         hasFatalError = true;
         Throwables.propagate(t);
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
index c7e8248..1a0432c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogTcpSource.java
@@ -96,8 +96,10 @@ public class SyslogTcpSource extends AbstractSource
           sourceCounter.incrementEventAcceptedCount();
         } catch (ChannelException ex) {
           logger.error("Error writting to channel, event dropped", ex);
+          sourceCounter.incrementChannelWriteFail();
         } catch (RuntimeException ex) {
           logger.error("Error parsing event from syslog stream, event dropped", ex);
+          sourceCounter.incrementEventReadFail();
           return;
         }
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
index ae0b8ac..1e47f34 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SyslogUDPSource.java
@@ -91,9 +91,11 @@ public class SyslogUDPSource extends AbstractSource
         sourceCounter.incrementEventAcceptedCount();
       } catch (ChannelException ex) {
         logger.error("Error writting to channel", ex);
+        sourceCounter.incrementChannelWriteFail();
         return;
       } catch (RuntimeException ex) {
         logger.error("Error parsing event from syslog stream, event dropped", ex);
+        sourceCounter.incrementEventReadFail();
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
index 6a25e64..33c37f2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -430,6 +430,7 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
       } catch (ChannelException ex) {
         logger.warn("Thrift source " + getName() + " could not append events " +
                     "to the channel.", ex);
+        sourceCounter.incrementChannelWriteFail();
         return Status.FAILED;
       }
       sourceCounter.incrementAppendAcceptedCount();
@@ -451,6 +452,7 @@ public class ThriftSource extends AbstractSource implements Configurable, EventD
         getChannelProcessor().processEventBatch(flumeEvents);
       } catch (ChannelException ex) {
         logger.warn("Thrift source %s could not append events to the channel.", getName());
+        sourceCounter.incrementChannelWriteFail();
         return Status.FAILED;
       }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
index ce6545a..d14bde2 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/http/HTTPSource.java
@@ -265,12 +265,14 @@ public class HTTPSource extends AbstractSource implements
         events = handler.getEvents(request);
       } catch (HTTPBadRequestException ex) {
         LOG.warn("Received bad request from client. ", ex);
+        sourceCounter.incrementEventReadFail();
         response.sendError(HttpServletResponse.SC_BAD_REQUEST,
                 "Bad request from client. "
                 + ex.getMessage());
         return;
       } catch (Exception ex) {
         LOG.warn("Deserializer threw unexpected exception. ", ex);
+        sourceCounter.incrementEventReadFail();
         response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                 "Deserializer threw unexpected exception. "
                 + ex.getMessage());
@@ -284,12 +286,14 @@ public class HTTPSource extends AbstractSource implements
         LOG.warn("Error appending event to channel. "
                 + "Channel might be full. Consider increasing the channel "
                 + "capacity or make sure the sinks perform faster.", ex);
+        sourceCounter.incrementChannelWriteFail();
         response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                 "Error appending event to channel. Channel might be full."
                 + ex.getMessage());
         return;
       } catch (Exception ex) {
         LOG.warn("Unexpected error appending event to channel. ", ex);
+        sourceCounter.incrementGenericProcessingFail();
         response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                 "Unexpected error while appending event to channel. "
                 + ex.getMessage());

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 422252e..6a5a69f 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -42,6 +42,7 @@ import org.apache.commons.lang.SystemUtils;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.flume.Event;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader.TrackingPolicy;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder;
@@ -145,6 +146,7 @@ public class TestReliableSpoolingFileEventReader {
         .spoolDirectory(WORK_DIR)
         .includePattern("^file2$")
         .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+        .sourceCounter(new SourceCounter("test"))
         .build();
 
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
@@ -166,6 +168,7 @@ public class TestReliableSpoolingFileEventReader {
             .spoolDirectory(WORK_DIR)
             .ignorePattern("^file2$")
             .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+            .sourceCounter(new SourceCounter("test"))
             .build();
 
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
@@ -195,6 +198,7 @@ public class TestReliableSpoolingFileEventReader {
         .ignorePattern("^file[013]$")
         .includePattern("^file2$")
         .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+        .sourceCounter(new SourceCounter("test"))
         .build();
 
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
@@ -222,6 +226,7 @@ public class TestReliableSpoolingFileEventReader {
         .ignorePattern("^file2$")
         .includePattern("^file2$")
         .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+        .sourceCounter(new SourceCounter("test"))
         .build();
 
     String[] beforeFiles = { "file0", "file1", "file2", "file3", "emptylineFile" };
@@ -239,6 +244,7 @@ public class TestReliableSpoolingFileEventReader {
   public void testRepeatedCallsWithCommitAlways() throws IOException {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
 
     final int expectedLines = 1 + 1 + 2 + 3 + 1;
@@ -261,6 +267,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .trackerDirPath(trackerDirPath)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
 
     final int expectedLines = 1 + 1 + 2 + 3 + 1;
@@ -288,6 +295,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .deletePolicy(DeletePolicy.IMMEDIATE.name())
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
 
     List<File> before = listFiles(WORK_DIR);
@@ -311,6 +319,7 @@ public class TestReliableSpoolingFileEventReader {
   public void testNullConsumeOrder() throws IOException {
     new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                  .consumeOrder(null)
+                                                 .sourceCounter(new SourceCounter("test"))
                                                  .build();
   }
 
@@ -319,6 +328,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.RANDOM)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File fileName = new File(WORK_DIR, "new-file");
     FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n");
@@ -340,6 +350,7 @@ public class TestReliableSpoolingFileEventReader {
     final ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.RANDOM)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File fileName = new File(WORK_DIR, "new-file");
     FileUtils.write(fileName, "New file created in the end. Shoud be read randomly.\n");
@@ -376,6 +387,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.OLDEST)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
@@ -403,6 +415,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.YOUNGEST)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
@@ -434,6 +447,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.OLDEST)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
@@ -463,6 +477,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .consumeOrder(ConsumeOrder.YOUNGEST)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
@@ -529,6 +544,7 @@ public class TestReliableSpoolingFileEventReader {
     ReliableEventReader reader =
         new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
                                                      .trackerDirPath(trackerDirPath)
+                                                     .sourceCounter(new SourceCounter("test"))
                                                      .build();
     final int expectedLines = 1;
     int seenLines = 0;
@@ -549,11 +565,12 @@ public class TestReliableSpoolingFileEventReader {
       dir = new File("target/test/work/" + this.getClass().getSimpleName() + "_large");
       Files.createParentDirs(new File(dir, "dummy"));
       ReliableEventReader reader =
-              new ReliableSpoolingFileEventReader.Builder().spoolDirectory(dir)
-                      .consumeOrder(order)
-                      .trackingPolicy(trackingPolicy.toString())
-                      .recursiveDirectorySearch(true)
-                      .build();
+          new ReliableSpoolingFileEventReader.Builder().spoolDirectory(dir)
+                                                       .consumeOrder(order)
+                                                       .trackingPolicy(trackingPolicy.toString())
+                                                       .recursiveDirectorySearch(true)
+                                                       .sourceCounter(new SourceCounter("test"))
+                                                       .build();
       Map<Long, List<String>> expected;
       if (comparator == null) {
         expected = new TreeMap<Long, List<String>>();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
index bc3aa82..3d43bf4 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.serialization.LineDeserializer;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
 import org.junit.After;
@@ -71,6 +72,7 @@ public class TestSpoolingFileLineReader {
           .spoolDirectory(tmpDir)
           .completedSuffix(completedSuffix)
           .deserializerContext(ctx)
+          .sourceCounter(new SourceCounter("dummy"))
           .build();
     } catch (IOException ioe) {
       throw Throwables.propagate(ioe);

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index fdb1772..8b6f493 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -25,6 +25,7 @@ import org.apache.avro.ipc.NettyServer;
 import org.apache.avro.ipc.Server;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -32,11 +33,13 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.api.RpcClient;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.source.AvroSource;
@@ -50,6 +53,8 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,6 +178,29 @@ public class TestAvroSink {
   }
 
   @Test
+  public void testChannelException() throws InterruptedException,
+          EventDeliveryException, InstantiationException, IllegalAccessException {
+    setUp();
+
+    Server server = createServer(new MockAvroServer());
+    server.start();
+    sink.start();
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Transaction transaction = Mockito.mock(BasicTransactionSemantics.class);
+    Mockito.when(channel.getTransaction()).thenReturn(transaction);
+    sink.setChannel(channel);
+
+    Sink.Status status = sink.process();
+
+    sink.stop();
+    server.close();
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getChannelReadFail());
+  }
+
+  @Test
   public void testTimeout() throws InterruptedException,
       EventDeliveryException, InstantiationException, IllegalAccessException {
     setUp();
@@ -226,6 +254,9 @@ public class TestAvroSink {
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
         LifecycleState.STOP_OR_ERROR, 5000));
     server.close();
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(2, sinkCounter.getEventWriteFail());
   }
 
   @Test
@@ -280,6 +311,10 @@ public class TestAvroSink {
     Assert.assertTrue(LifecycleController.waitForOneOf(sink,
         LifecycleState.STOP_OR_ERROR, 5000));
     server.close();
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(5, sinkCounter.getEventWriteFail());
+    Assert.assertEquals(4, sinkCounter.getConnectionFailedCount());
   }
 
   @Test
@@ -608,6 +643,9 @@ public class TestAvroSink {
     if (failed) {
       Assert.fail("SSL-enabled sink successfully connected to a non-SSL-enabled server, that's wrong.");
     }
+
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getEventWriteFail());
   }
 
   @Test
@@ -664,6 +702,8 @@ public class TestAvroSink {
     if (failed) {
       Assert.fail("SSL-enabled sink successfully connected to a server with an untrusted certificate when it should have failed");
     }
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getEventWriteFail());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
index e4d2e2e..6b74e2d 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java
@@ -20,17 +20,23 @@
 package org.apache.flume.sink;
 
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.PseudoTxnMemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
-import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +71,7 @@ public class TestRollingFileSink {
   }
 
   @Test
-  public void testLifecycle() throws InterruptedException, LifecycleException {
+  public void testLifecycle() {
     Context context = new Context();
 
     context.put("sink.directory", tmpDir.getPath());
@@ -77,7 +83,7 @@ public class TestRollingFileSink {
   }
 
   @Test
-  public void testAppend() throws InterruptedException, LifecycleException,
+  public void testAppend() throws InterruptedException,
       EventDeliveryException, IOException {
 
     Context context = new Context();
@@ -86,46 +92,11 @@ public class TestRollingFileSink {
     context.put("sink.rollInterval", "1");
     context.put("sink.batchSize", "1");
 
-    Configurables.configure(sink, context);
-
-    Channel channel = new PseudoTxnMemoryChannel();
-    Configurables.configure(channel, context);
-
-    sink.setChannel(channel);
-    sink.start();
-
-    for (int i = 0; i < 10; i++) {
-      Event event = new SimpleEvent();
-
-      event.setBody(("Test event " + i).getBytes());
-
-      channel.put(event);
-      sink.process();
-
-      Thread.sleep(500);
-    }
-
-    sink.stop();
-
-    for (String file : sink.getDirectory().list()) {
-      BufferedReader reader =
-          new BufferedReader(new FileReader(new File(sink.getDirectory(), file)));
-
-      String lastLine = null;
-      String currentLine = null;
-
-      while ((currentLine = reader.readLine()) != null) {
-        lastLine = currentLine;
-      }
-
-      logger.debug("Produced file:{} lastLine:{}", file, lastLine);
-
-      reader.close();
-    }
+    doTest(context);
   }
 
   @Test
-  public void testAppend2() throws InterruptedException, LifecycleException,
+  public void testAppend2() throws InterruptedException,
       EventDeliveryException, IOException {
 
     Context context = new Context();
@@ -134,48 +105,12 @@ public class TestRollingFileSink {
     context.put("sink.rollInterval", "0");
     context.put("sink.batchSize", "1");
 
-
-    Configurables.configure(sink, context);
-
-    Channel channel = new PseudoTxnMemoryChannel();
-    Configurables.configure(channel, context);
-
-    sink.setChannel(channel);
-    sink.start();
-
-    for (int i = 0; i < 10; i++) {
-      Event event = new SimpleEvent();
-
-      event.setBody(("Test event " + i).getBytes());
-
-      channel.put(event);
-      sink.process();
-
-      Thread.sleep(500);
-    }
-
-    sink.stop();
-
-    for (String file : sink.getDirectory().list()) {
-      BufferedReader reader =
-          new BufferedReader(new FileReader(new File(sink.getDirectory(), file)));
-
-      String lastLine = null;
-      String currentLine = null;
-
-      while ((currentLine = reader.readLine()) != null) {
-        lastLine = currentLine;
-        logger.debug("Produced file:{} lastLine:{}", file, lastLine);
-      }
-
-
-      reader.close();
-    }
+    doTest(context);
   }
 
   @Test
   public void testAppend3()
-      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
+      throws InterruptedException, EventDeliveryException, IOException {
     File tmpDir = new File("target/tmpLog");
     tmpDir.mkdirs();
     cleanDirectory(tmpDir);
@@ -187,46 +122,12 @@ public class TestRollingFileSink {
     context.put("sink.pathManager.prefix", "test3-");
     context.put("sink.pathManager.extension", "txt");
 
-    Configurables.configure(sink, context);
-
-    Channel channel = new PseudoTxnMemoryChannel();
-    Configurables.configure(channel, context);
-
-    sink.setChannel(channel);
-    sink.start();
-
-    for (int i = 0; i < 10; i++) {
-      Event event = new SimpleEvent();
-
-      event.setBody(("Test event " + i).getBytes());
-
-      channel.put(event);
-      sink.process();
-
-      Thread.sleep(500);
-    }
-
-    sink.stop();
-
-    for (String file : sink.getDirectory().list()) {
-      BufferedReader reader =
-          new BufferedReader(new FileReader(new File(sink.getDirectory(), file)));
-
-      String lastLine = null;
-      String currentLine = null;
-
-      while ((currentLine = reader.readLine()) != null) {
-        lastLine = currentLine;
-        logger.debug("Produced file:{} lastLine:{}", file, lastLine);
-      }
-
-      reader.close();
-    }
+    doTest(context);
   }
 
   @Test
   public void testRollTime()
-      throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
+      throws InterruptedException, EventDeliveryException, IOException {
     File tmpDir = new File("target/tempLog");
     tmpDir.mkdirs();
     cleanDirectory(tmpDir);
@@ -239,10 +140,45 @@ public class TestRollingFileSink {
     context.put("sink.pathManager.prefix", "test4-");
     context.put("sink.pathManager.extension", "txt");
 
+    doTest(context);
+  }
+
+  @Test
+  public void testChannelException() throws InterruptedException, IOException {
+
+    Context context = new Context();
+
+    context.put("sink.directory", tmpDir.getPath());
+    context.put("sink.rollInterval", "0");
+    context.put("sink.batchSize", "1");
+
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Transaction transaction = Mockito.mock(BasicTransactionSemantics.class);
+    Mockito.when(channel.getTransaction()).thenReturn(transaction);
+
+    try {
+      doTest(context, channel);
+    } catch (EventDeliveryException e) {
+      //
+    }
+    SinkCounter sinkCounter = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sinkCounter.getChannelReadFail());
+  }
+
+  private void doTest(Context context) throws EventDeliveryException, InterruptedException,
+      IOException {
+    doTest(context, null);
+  }
+
+  private void doTest(Context context, Channel channel) throws EventDeliveryException,
+      InterruptedException, IOException {
     Configurables.configure(sink, context);
 
-    Channel channel = new PseudoTxnMemoryChannel();
-    Configurables.configure(channel, context);
+    if (channel == null) {
+      channel = new PseudoTxnMemoryChannel();
+      Configurables.configure(channel, context);
+    }
 
     sink.setChannel(channel);
     sink.start();

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
index fcfc72c..6f784ea 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestAvroSource.java
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
 import java.security.cert.X509Certificate;
 import java.nio.channels.ServerSocketChannel;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -41,6 +42,7 @@ import javax.net.ssl.X509TrustManager;
 import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -50,6 +52,7 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.source.avro.AvroFlumeEvent;
@@ -64,9 +67,15 @@ import org.jboss.netty.handler.ssl.SslHandler;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.doThrow;
+
 public class TestAvroSource {
 
   private static final Logger logger = LoggerFactory
@@ -578,4 +587,26 @@ public class TestAvroSource {
     Assert.assertEquals("Server is stopped", LifecycleState.STOP,
         source.getLifecycleState());
   }
+
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    Context context = new Context();
+    context.put("port", String.valueOf(selectedPort = getFreePort()));
+    context.put("bind", "0.0.0.0");
+    source.configure(context);
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).when(cp).processEvent(any(Event.class));
+    doThrow(new ChannelException("dummy")).when(cp).processEventBatch(anyListOf(Event.class));
+    source.setChannelProcessor(cp);
+    source.start();
+    AvroFlumeEvent avroEvent = new AvroFlumeEvent();
+    avroEvent.setHeaders(new HashMap<CharSequence, CharSequence>());
+    avroEvent.setBody(ByteBuffer.wrap("Hello avro ssl".getBytes()));
+    source.append(avroEvent);
+    source.appendBatch(Arrays.asList(avroEvent));
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(2, sc.getChannelWriteFail());
+    source.stop();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
index 9a6c5f4..8155a12 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestMultiportSyslogTCPSource.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -57,6 +58,8 @@ import org.apache.mina.transport.socket.nio.NioSession;
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import static org.mockito.Mockito.*;
 
@@ -85,30 +88,28 @@ public class TestMultiportSyslogTCPSource {
     return msg1.getBytes();
   }
 
-  /**
-   * Basic test to exercise multiple-port parsing.
-   */
-  @Test
-  public void testMultiplePorts() throws IOException, ParseException {
-    MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
-    Channel channel = new MemoryChannel();
-
+  private List<Integer> testNPorts(MultiportSyslogTCPSource source, Channel channel,
+                                   List<Event> channelEvents, int numPorts,
+                                   ChannelProcessor channelProcessor) throws IOException {
     Context channelContext = new Context();
     channelContext.put("capacity", String.valueOf(2000));
     channelContext.put("transactionCapacity", String.valueOf(2000));
     Configurables.configure(channel, channelContext);
 
-    List<Channel> channels = Lists.newArrayList();
-    channels.add(channel);
+    if (channelProcessor == null) {
+      List<Channel> channels = Lists.newArrayList();
+      channels.add(channel);
 
-    ChannelSelector rcs = new ReplicatingChannelSelector();
-    rcs.setChannels(channels);
+      ChannelSelector rcs = new ReplicatingChannelSelector();
+      rcs.setChannels(channels);
 
-    source.setChannelProcessor(new ChannelProcessor(rcs));
-    Context context = new Context();
+      source.setChannelProcessor(new ChannelProcessor(rcs));
+    } else {
+      source.setChannelProcessor(channelProcessor);
+    }
 
-    List<Integer> portList = new ArrayList<>(1000);
-    while (portList.size() < 1000) {
+    List<Integer> portList = new ArrayList<>(numPorts);
+    while (portList.size() < numPorts) {
       int port = getFreePort();
       if (!portList.contains(port)) {
         portList.add(port);
@@ -116,26 +117,26 @@ public class TestMultiportSyslogTCPSource {
     }
 
     StringBuilder ports = new StringBuilder();
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < numPorts; i++) {
       ports.append(String.valueOf(portList.get(i))).append(" ");
     }
+    Context context = new Context();
     context.put(SyslogSourceConfigurationConstants.CONFIG_PORTS,
         ports.toString().trim());
     source.configure(context);
     source.start();
 
     Socket syslogSocket;
-    for (int i = 0; i < 1000 ; i++) {
+    for (int i = 0; i < numPorts; i++) {
       syslogSocket = new Socket(
-              InetAddress.getLocalHost(), portList.get(i));
+          InetAddress.getLocalHost(), portList.get(i));
       syslogSocket.getOutputStream().write(getEvent(i));
       syslogSocket.close();
     }
 
-    List<Event> channelEvents = new ArrayList<Event>();
     Transaction txn = channel.getTransaction();
     txn.begin();
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < numPorts; i++) {
       Event e = channel.take();
       if (e == null) {
         throw new NullPointerException("Event is null");
@@ -149,8 +150,26 @@ public class TestMultiportSyslogTCPSource {
     } finally {
       txn.close();
     }
+
+
+    return portList;
+  }
+
+  /**
+   * Basic test to exercise multiple-port parsing.
+   */
+  @Test
+  public void testMultiplePorts() throws IOException, ParseException {
+    MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
+    Channel channel = new MemoryChannel();
+    List<Event> channelEvents = new ArrayList<>();
+    int numPorts = 1000;
+
+    List<Integer> portList = testNPorts(source, channel, channelEvents,
+        numPorts, null);
+
     //Since events can arrive out of order, search for each event in the array
-    for (int i = 0; i < 1000 ; i++) {
+    for (int i = 0; i < numPorts ; i++) {
       Iterator<Event> iter = channelEvents.iterator();
       while (iter.hasNext()) {
         Event e = iter.next();
@@ -284,6 +303,24 @@ public class TestMultiportSyslogTCPSource {
     Assert.assertArrayEquals("Raw message data should be kept in body of event",
         badUtf8Seq, evt.getBody());
 
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(handler, "sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
+
+  }
+
+  @Test
+  public void testHandlerGenericFail() throws Exception {
+    // defaults to UTF-8
+    MultiportSyslogHandler handler = new MultiportSyslogHandler(
+        1000, 10, new ChannelProcessor(new ReplicatingChannelSelector()),
+        new SourceCounter("test"), "port",
+        new ThreadSafeDecoder(Charsets.UTF_8),
+        new ConcurrentHashMap<Integer, ThreadSafeDecoder>(),
+        null);
+
+    handler.exceptionCaught(null, new RuntimeException("dummy"));
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(handler, "sourceCounter");
+    Assert.assertEquals(1, sc.getGenericProcessingFail());
   }
 
   // helper function
@@ -391,6 +428,27 @@ public class TestMultiportSyslogTCPSource {
     evt = takeEvent(chan);
     Assert.assertNotNull("Event vanished!", evt);
     Assert.assertNull(evt.getHeaders().get(SyslogUtils.EVENT_STATUS));
+
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(handler, "sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
+  }
+
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    MultiportSyslogTCPSource source = new MultiportSyslogTCPSource();
+    Channel channel = new MemoryChannel();
+    List<Event> channelEvents = new ArrayList<>();
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).doNothing().when(cp)
+        .processEventBatch(anyListOf(Event.class));
+    try {
+      testNPorts(source, channel, channelEvents, 1, cp);
+    } catch (Exception e) {
+      //
+    }
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getChannelWriteFail());
+    source.stop();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 92a698d..7c671a6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -21,6 +21,8 @@ import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
+import org.apache.flume.ChannelFullException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -28,13 +30,17 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -497,4 +503,68 @@ public class TestSpoolDirectorySource {
     txn.close();
     source.stop();
   }
+
+
+  private SourceCounter errorCounterCommonInit() {
+    SourceCounter sc = new SourceCounter("dummy");
+    sc.start();
+    Context context = new Context();
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+        tmpDir.getAbsolutePath());
+    Configurables.configure(source, context);
+    return sc;
+  }
+
+  @Test
+  public void testErrorCounters() throws Exception {
+    SourceCounter sc = errorCounterCommonInit();
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    Mockito.doThrow(new ChannelException("dummy"))
+        .doThrow(new ChannelFullException("dummy"))
+        .doThrow(new RuntimeException("runtime"))
+        .when(cp).processEventBatch(Matchers.anyListOf(Event.class));
+    source.setChannelProcessor(cp);
+
+    ReliableSpoolingFileEventReader reader = Mockito.mock(ReliableSpoolingFileEventReader.class);
+    List<Event> events = new ArrayList<>();
+    events.add(Mockito.mock(Event.class));
+    Mockito.doReturn(events)
+        .doReturn(events)
+        .doReturn(events)
+        .doThrow(new IOException("dummy"))
+        .when(reader).readEvents(Mockito.anyInt());
+
+    Runnable runner = source. new SpoolDirectoryRunnable(reader, sc);
+    try {
+      runner.run();
+    } catch (Exception ex) {
+      //Expected
+    }
+    Assert.assertEquals(2, sc.getChannelWriteFail());
+    Assert.assertEquals(1, sc.getGenericProcessingFail());
+  }
+
+  @Test
+  public void testErrorCounterEventReadFail() throws Exception {
+    SourceCounter sc = errorCounterCommonInit();
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    source.setChannelProcessor(cp);
+
+    ReliableSpoolingFileEventReader reader = Mockito.mock(ReliableSpoolingFileEventReader.class);
+    List<Event> events = new ArrayList<>();
+    events.add(Mockito.mock(Event.class));
+    Mockito.doReturn(events)
+        .doThrow(new IOException("dummy"))
+        .when(reader).readEvents(Mockito.anyInt());
+
+    Runnable runner = source. new SpoolDirectoryRunnable(reader, sc);
+    try {
+      runner.run();
+    } catch (Exception ex) {
+      //Expected
+    }
+    Assert.assertEquals(1, sc.getEventReadFail());
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
index f07acc6..fbacdec 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogTcpSource.java
@@ -20,6 +20,7 @@ package org.apache.flume.source;
 
 import com.google.common.base.Charsets;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -31,6 +32,7 @@ import org.apache.flume.conf.Configurables;
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
@@ -40,6 +42,9 @@ import java.net.Socket;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+
 public class TestSyslogTcpSource {
   private static final org.slf4j.Logger logger =
       LoggerFactory.getLogger(TestSyslogTcpSource.class);
@@ -159,5 +164,43 @@ public class TestSyslogTcpSource {
     Assert.assertEquals(10, source.getSourceCounter().getEventAcceptedCount());
     Assert.assertEquals(10, source.getSourceCounter().getEventReceivedCount());
   }
+
+  @Test
+  public void testSourceCounterChannelFail() throws Exception {
+    init("true");
+
+    errorCounterCommon(new ChannelException("dummy"));
+
+    for (int i = 0; i < 10 && source.getSourceCounter().getChannelWriteFail() == 0; i++) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, source.getSourceCounter().getChannelWriteFail());
+  }
+
+  @Test
+  public void testSourceCounterEventFail() throws Exception {
+    init("true");
+
+    errorCounterCommon(new RuntimeException("dummy"));
+
+    for (int i = 0; i < 10 && source.getSourceCounter().getEventReadFail() == 0; i++) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, source.getSourceCounter().getEventReadFail());
+  }
+
+  private void errorCounterCommon(Exception e) throws IOException {
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(e).when(cp).processEvent(any(Event.class));
+    source.setChannelProcessor(cp);
+
+    source.start();
+    // Write some message to the syslog port
+    InetSocketAddress addr = source.getBoundAddress();
+    try (Socket syslogSocket = new Socket(addr.getAddress(), addr.getPort())) {
+      syslogSocket.getOutputStream().write(bodyWithTandH.getBytes());
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
index cb3860d..06122e6 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSyslogUdpSource.java
@@ -20,6 +20,7 @@ package org.apache.flume.source;
 
 import com.google.common.base.Charsets;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -31,6 +32,7 @@ import org.apache.flume.conf.Configurables;
 import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
@@ -43,6 +45,9 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+
 public class TestSyslogUdpSource {
   private static final org.slf4j.Logger logger =
       LoggerFactory.getLogger(TestSyslogUdpSource.class);
@@ -185,15 +190,7 @@ public class TestSyslogUdpSource {
   public void testSourceCounter() throws Exception {
     init("true");
 
-    source.start();
-    DatagramPacket datagramPacket = createDatagramPacket("test".getBytes());
-    sendDatagramPacket(datagramPacket);
-
-    Transaction txn = channel.getTransaction();
-    txn.begin();
-
-    channel.take();
-    commitAndCloseTransaction(txn);
+    doCounterCommon();
 
     // Retrying up to 10 times while the acceptedCount == 0 because the event processing in
     // SyslogUDPSource is handled on a separate thread by Netty so message delivery,
@@ -201,11 +198,47 @@ public class TestSyslogUdpSource {
     for (int i = 0; i < 10 && source.getSourceCounter().getEventAcceptedCount() == 0; i++) {
       Thread.sleep(100);
     }
-
     Assert.assertEquals(1, source.getSourceCounter().getEventAcceptedCount());
     Assert.assertEquals(1, source.getSourceCounter().getEventReceivedCount());
   }
 
+  private void doCounterCommon() throws IOException, InterruptedException {
+    source.start();
+    DatagramPacket datagramPacket = createDatagramPacket("test".getBytes());
+    sendDatagramPacket(datagramPacket);
+  }
+
+  @Test
+  public void testSourceCounterChannelFail() throws Exception {
+    init("true");
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).when(cp).processEvent(any(Event.class));
+    source.setChannelProcessor(cp);
+
+    doCounterCommon();
+
+    for (int i = 0; i < 10 && source.getSourceCounter().getChannelWriteFail() == 0; i++) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, source.getSourceCounter().getChannelWriteFail());
+  }
+
+  @Test
+  public void testSourceCounterReadFail() throws Exception {
+    init("true");
+
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new RuntimeException("dummy")).when(cp).processEvent(any(Event.class));
+    source.setChannelProcessor(cp);
+
+    doCounterCommon();
+    for (int i = 0; i < 10 && source.getSourceCounter().getEventReadFail() == 0; i++) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, source.getSourceCounter().getEventReadFail());
+  }
+
   private DatagramPacket createDatagramPacket(byte[] payload) {
     InetSocketAddress addr = source.getBoundAddress();
     return new DatagramPacket(payload, payload.length, addr.getAddress(), addr.getPort());

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
index f7c6361..d594276 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestThriftSource.java
@@ -21,6 +21,7 @@ package org.apache.flume.source;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -36,10 +37,13 @@ import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 
+import org.apache.flume.instrumentation.SourceCounter;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.KeyManagerFactory;
@@ -47,6 +51,7 @@ import javax.net.ssl.KeyManagerFactory;
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -55,6 +60,10 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.doThrow;
+
 public class TestThriftSource {
 
   private ThriftSource source;
@@ -290,6 +299,34 @@ public class TestThriftSource {
     }
   }
 
+  @Test
+  public void testErrorCounterChannelWriteFail() throws Exception {
+    client = RpcClientFactory.getThriftInstance(props);
+    Context context = new Context();
+    context.put(ThriftSource.CONFIG_BIND, "0.0.0.0");
+    context.put(ThriftSource.CONFIG_PORT, String.valueOf(port));
+    source.configure(context);
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new ChannelException("dummy")).when(cp).processEvent(any(Event.class));
+    doThrow(new ChannelException("dummy")).when(cp).processEventBatch(anyListOf(Event.class));
+    source.setChannelProcessor(cp);
+    source.start();
+    Event event = EventBuilder.withBody("hello".getBytes());
+    try {
+      client.append(event);
+    } catch (EventDeliveryException e) {
+      //
+    }
+    try {
+      client.appendBatch(Arrays.asList(event));
+    } catch (EventDeliveryException e) {
+      //
+    }
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(2, sc.getChannelWriteFail());
+    source.stop();
+  }
+
   private class SubmitHelper implements Runnable {
     private final int i;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
index bb61dce..04eec24 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java
@@ -22,6 +22,7 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import junit.framework.Assert;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelSelector;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -31,6 +32,7 @@ import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.JSONEvent;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpOptions;
 import org.apache.http.client.methods.HttpPost;
@@ -44,6 +46,8 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -78,6 +82,8 @@ import java.util.Random;
 import java.util.Set;
 
 import static org.fest.reflect.core.Reflection.field;
+import static org.mockito.Matchers.anyListOf;
+import static org.mockito.Mockito.doThrow;
 
 /**
  *
@@ -204,7 +210,6 @@ public class TestHTTPSource {
     doTestForbidden(new HttpOptions("http://0.0.0.0:" + selectedPort));
   }
 
-
   private void doTestForbidden(HttpRequestBase request) throws Exception {
     HttpResponse response = httpClient.execute(request);
     Assert.assertEquals(HttpServletResponse.SC_FORBIDDEN,
@@ -248,6 +253,8 @@ public class TestHTTPSource {
 
     Assert.assertEquals(HttpServletResponse.SC_BAD_REQUEST,
             response.getStatusLine().getStatusCode());
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getEventReadFail());
 
   }
 
@@ -265,6 +272,19 @@ public class TestHTTPSource {
   public void testBigBatchDeserializarionUTF32() throws Exception {
     testBatchWithVariousEncoding("UTF-32");
   }
+
+  @Test
+  public void testCounterGenericFail() throws Exception {
+    ChannelProcessor cp = Mockito.mock(ChannelProcessor.class);
+    doThrow(new RuntimeException("dummy")).when(cp).processEventBatch(anyListOf(Event.class));
+    ChannelProcessor oldCp = source.getChannelProcessor();
+    source.setChannelProcessor(cp);
+    testBatchWithVariousEncoding("UTF-8");
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getGenericProcessingFail());
+    source.setChannelProcessor(oldCp);
+  }
+
   @Test
   public void testSingleEvent() throws Exception {
     StringEntity input = new StringEntity("[{\"headers\" : {\"a\": \"b\"},\"body\":"
@@ -370,6 +390,8 @@ public class TestHTTPSource {
     HttpResponse response = putWithEncoding("UTF-8", 150).response;
     Assert.assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
             response.getStatusLine().getStatusCode());
+    SourceCounter sc = (SourceCounter) Whitebox.getInternalState(source, "sourceCounter");
+    Assert.assertEquals(1, sc.getChannelWriteFail());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 40f2f4a..22306a0 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -441,10 +441,12 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     } catch (IOException eIO) {
       transaction.rollback();
       LOG.warn("HDFS IO error", eIO);
+      sinkCounter.incrementEventWriteFail();
       return Status.BACKOFF;
     } catch (Throwable th) {
       transaction.rollback();
       LOG.error("process failed", th);
+      sinkCounter.incrementEventWriteOrChannelFail(th);
       if (th instanceof Error) {
         throw (Error) th;
       } else {

http://git-wip-us.apache.org/repos/asf/flume/blob/3a22cd4d/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index bbc0ba8..f86c96d 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -45,6 +45,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.commons.lang.StringUtils;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Clock;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -52,10 +53,12 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink.Status;
 import org.apache.flume.SystemClock;
 import org.apache.flume.Transaction;
+import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -75,6 +78,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
@@ -754,6 +758,8 @@ public class TestHDFSEventSink {
     sink.stop();
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
 
+    SinkCounter sc = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sc.getEventWriteFail());
   }
 
 
@@ -930,6 +936,9 @@ public class TestHDFSEventSink {
     sink.stop();
 
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
+
+    SinkCounter sc = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sc.getEventWriteFail());
   }
 
   /**
@@ -1170,6 +1179,9 @@ public class TestHDFSEventSink {
     }
 
     sink.stop();
+
+    SinkCounter sc = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(2, sc.getEventWriteFail());
   }
 
   /*
@@ -1625,4 +1637,29 @@ public class TestHDFSEventSink {
 
     sink.stop();
   }
+
+  @Test
+  public void testChannelException() {
+    LOG.debug("Starting...");
+    Context context = new Context();
+    context.put("hdfs.path", testPath);
+    context.put("keep-alive", "0");
+    Configurables.configure(sink, context);
+    Channel channel = Mockito.mock(Channel.class);
+    Mockito.when(channel.take()).thenThrow(new ChannelException("dummy"));
+    Mockito.when(channel.getTransaction())
+        .thenReturn(Mockito.mock(BasicTransactionSemantics.class));
+    sink.setChannel(channel);
+    sink.start();
+    try {
+      sink.process();
+    } catch (EventDeliveryException e) {
+      //
+    }
+    sink.stop();
+
+    SinkCounter sc = (SinkCounter) Whitebox.getInternalState(sink, "sinkCounter");
+    Assert.assertEquals(1, sc.getChannelReadFail());
+  }
+
 }