You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2015/01/17 01:25:56 UTC

flume git commit: FLUME-2562. Add metrics for Kafka Source, Kafka Sink and Kafka Channel.

Repository: flume
Updated Branches:
  refs/heads/trunk 199684b62 -> 1d9bab676


FLUME-2562. Add metrics for Kafka Source, Kafka Sink and Kafka Channel.

(Gwen Shapira via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1d9bab67
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1d9bab67
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1d9bab67

Branch: refs/heads/trunk
Commit: 1d9bab6760df38e538705a74dd599de03129777b
Parents: 199684b
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Jan 16 16:24:24 2015 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Jan 16 16:24:24 2015 -0800

----------------------------------------------------------------------
 .../flume/channel/kafka/KafkaChannel.java       | 25 +++++-
 .../flume/instrumentation/ChannelCounter.java   |  7 ++
 .../flume/instrumentation/SinkCounter.java      |  8 ++
 .../flume/instrumentation/SourceCounter.java    |  7 ++
 .../kafka/KafkaChannelCounter.java              | 82 ++++++++++++++++++++
 .../kafka/KafkaChannelCounterMBean.java         | 50 ++++++++++++
 .../instrumentation/kafka/KafkaSinkCounter.java | 53 +++++++++++++
 .../kafka/KafkaSinkCounterMBean.java            | 48 ++++++++++++
 .../kafka/KafkaSourceCounter.java               | 64 +++++++++++++++
 .../kafka/KafkaSourceCounterMBean.java          | 47 +++++++++++
 .../org/apache/flume/sink/kafka/KafkaSink.java  | 15 ++++
 .../apache/flume/source/kafka/KafkaSource.java  | 18 +++++
 12 files changed, 423 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index d767aac..80a122d 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -36,6 +36,7 @@ import org.apache.flume.conf.ConfigurationException;
 import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
 
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +65,8 @@ public class KafkaChannel extends BasicChannelSemantics {
   private final List<ConsumerAndIterator> consumers =
     Collections.synchronizedList(new LinkedList<ConsumerAndIterator>());
 
+  private KafkaChannelCounter counter;
+
   /* Each ConsumerConnector commit will commit all partitions owned by it. To
    * ensure that each partition is only committed when all events are
    * actually done, we will need to keep a ConsumerConnector per thread.
@@ -95,6 +98,7 @@ public class KafkaChannel extends BasicChannelSemantics {
       // We always have just one topic being read by one thread
       LOGGER.info("Topic = " + topic.get());
       topicCountMap.put(topic.get(), 1);
+      counter.start();
       super.start();
     } catch (Exception e) {
       LOGGER.error("Could not start producer");
@@ -114,7 +118,10 @@ public class KafkaChannel extends BasicChannelSemantics {
       }
     }
     producer.close();
+    counter.stop();
     super.stop();
+    LOGGER.info("Kafka channel {} stopped. Metrics: {}", getName(),
+        counter);
   }
 
   @Override
@@ -192,6 +199,10 @@ public class KafkaChannel extends BasicChannelSemantics {
       kafkaConf.put("auto.offset.reset", "smallest");
     }
 
+    if (counter == null) {
+      counter = new KafkaChannelCounter(getName());
+    }
+
   }
 
   private void decommissionConsumerAndIterator(ConsumerAndIterator c) {
@@ -291,7 +302,10 @@ public class KafkaChannel extends BasicChannelSemantics {
       } else {
         try {
           ConsumerIterator<byte[], byte[]> it = consumerAndIter.get().iterator;
+          long startTime = System.nanoTime();
           it.hasNext();
+          long endTime = System.nanoTime();
+          counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000));
           if (parseAsFlumeEvent) {
             ByteArrayInputStream in =
               new ByteArrayInputStream(it.next().message());
@@ -339,7 +353,11 @@ public class KafkaChannel extends BasicChannelSemantics {
             messages.add(new KeyedMessage<String, byte[]>(topic.get(), null,
               batchUUID, event));
           }
+          long startTime = System.nanoTime();
           producer.send(messages);
+          long endTime = System.nanoTime();
+          counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
+          counter.addToEventPutSuccessCount(Long.valueOf(messages.size()));
           serializedEvents.get().clear();
         } catch (Exception ex) {
           LOGGER.warn("Sending events to Kafka failed", ex);
@@ -348,8 +366,12 @@ public class KafkaChannel extends BasicChannelSemantics {
         }
       } else {
         if (consumerAndIter.get().failedEvents.isEmpty() && eventTaken) {
+          long startTime = System.nanoTime();
           consumerAndIter.get().consumer.commitOffsets();
-        }
+          long endTime = System.nanoTime();
+          counter.addToKafkaCommitTimer((endTime-startTime)/(1000*1000));
+         }
+        counter.addToEventTakeSuccessCount(Long.valueOf(events.get().size()));
         events.get().clear();
       }
     }
@@ -362,6 +384,7 @@ public class KafkaChannel extends BasicChannelSemantics {
       if (type.equals(TransactionType.PUT)) {
         serializedEvents.get().clear();
       } else {
+        counter.addToRollbackCounter(Long.valueOf(events.get().size()));
         consumerAndIter.get().failedEvents.addAll(events.get());
         events.get().clear();
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
index 9938c0a..977ad6c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.flume.instrumentation;
 
+import org.apache.commons.lang.ArrayUtils;
+
 public class ChannelCounter extends MonitoredCounterGroup implements
     ChannelCounterMBean {
 
@@ -48,6 +50,11 @@ public class ChannelCounter extends MonitoredCounterGroup implements
     super(MonitoredCounterGroup.Type.CHANNEL, name, ATTRIBUTES);
   }
 
+  public ChannelCounter(String name, String[] attributes) {
+    super(MonitoredCounterGroup.Type.CHANNEL, name,
+        (String[])ArrayUtils.addAll(attributes,ATTRIBUTES));
+  }
+
   @Override
   public long getChannelSize() {
     return get(COUNTER_CHANNEL_SIZE);

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
index 41b28cf..54f4a4c 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flume.instrumentation;
 
+import org.apache.commons.lang.ArrayUtils;
+
 public class SinkCounter extends MonitoredCounterGroup implements
     SinkCounterMBean {
 
@@ -56,6 +58,12 @@ public class SinkCounter extends MonitoredCounterGroup implements
     super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
   }
 
+  public SinkCounter(String name, String[] attributes) {
+    super(MonitoredCounterGroup.Type.SINK, name,
+        (String[]) ArrayUtils.addAll(attributes,ATTRIBUTES));
+  }
+
+
   @Override
   public long getConnectionCreatedCount() {
     return get(COUNTER_CONNECTION_CREATED);

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
index 972d2c6..02ef6ed 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
@@ -18,6 +18,8 @@
  */
 package org.apache.flume.instrumentation;
 
+import org.apache.commons.lang.ArrayUtils;
+
 public class SourceCounter extends MonitoredCounterGroup implements
     SourceCounterMBean {
 
@@ -53,6 +55,11 @@ public class SourceCounter extends MonitoredCounterGroup implements
     super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES);
   }
 
+  public SourceCounter(String name, String[] attributes) {
+    super(Type.SOURCE, name,
+        (String[]) ArrayUtils.addAll(attributes,ATTRIBUTES));
+  }
+
   @Override
   public long getEventReceivedCount() {
     return get(COUNTER_EVENTS_RECEIVED);

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
new file mode 100644
index 0000000..6e142cf
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounter.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.flume.instrumentation.ChannelCounter;
+
+public class KafkaChannelCounter extends ChannelCounter
+    implements KafkaChannelCounterMBean {
+
+
+  private static final String TIMER_KAFKA_EVENT_GET =
+      "channel.kafka.event.get.time";
+
+  private static final String TIMER_KAFKA_EVENT_SEND =
+      "channel.kafka.event.send.time";
+
+  private static final String TIMER_KAFKA_COMMIT =
+      "channel.kafka.commit.time";
+
+  private static final String COUNT_ROLLBACK =
+      "channel.rollback.count";
+
+
+  private static String[] ATTRIBUTES = {
+      TIMER_KAFKA_COMMIT,TIMER_KAFKA_EVENT_SEND,TIMER_KAFKA_EVENT_GET,
+      COUNT_ROLLBACK
+  };
+
+
+  public KafkaChannelCounter(String name) {
+    super(name,ATTRIBUTES);
+  }
+
+  public long addToKafkaEventGetTimer(long delta) {
+    return addAndGet(TIMER_KAFKA_EVENT_GET, delta);
+  }
+
+  public long addToKafkaEventSendTimer(long delta) {
+    return addAndGet(TIMER_KAFKA_EVENT_SEND, delta);
+  }
+
+  public long addToKafkaCommitTimer(long delta) {
+    return addAndGet(TIMER_KAFKA_COMMIT, delta);
+  }
+
+  public long addToRollbackCounter(long delta) {
+    return addAndGet(COUNT_ROLLBACK,delta);
+  }
+
+  public long getKafkaEventGetTimer() {
+    return get(TIMER_KAFKA_EVENT_GET);
+  }
+
+  public long getKafkaEventSendTimer() {
+    return get(TIMER_KAFKA_EVENT_SEND);
+  }
+
+  public long getKafkaCommitTimer() {
+    return get(TIMER_KAFKA_COMMIT);
+  }
+
+  public long getRollbackCount() {
+    return get(COUNT_ROLLBACK);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java
new file mode 100644
index 0000000..da64f0c
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaChannelCounterMBean.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+public interface KafkaChannelCounterMBean {
+
+  long getKafkaEventGetTimer();
+
+  long getKafkaEventSendTimer();
+
+  long getKafkaCommitTimer();
+
+  long getRollbackCount();
+
+  long getChannelSize();
+
+  long getEventPutAttemptCount();
+
+  long getEventTakeAttemptCount();
+
+  long getEventPutSuccessCount();
+
+  long getEventTakeSuccessCount();
+
+  long getStartTime();
+
+  long getStopTime();
+
+  long getChannelCapacity();
+
+  String getType();
+
+  double getChannelFillPercentage();
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java
new file mode 100644
index 0000000..1308ff3
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounter.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+import org.apache.flume.instrumentation.SinkCounter;
+
+public class KafkaSinkCounter extends SinkCounter implements KafkaSinkCounterMBean {
+
+  private static final String TIMER_KAFKA_EVENT_SEND =
+      "channel.kafka.event.send.time";
+
+  private static final String COUNT_ROLLBACK =
+      "channel.rollback.count";
+
+  private static final String[] ATTRIBUTES =
+      {COUNT_ROLLBACK,TIMER_KAFKA_EVENT_SEND};
+
+  public KafkaSinkCounter(String name) {
+    super(name,ATTRIBUTES);
+  }
+
+  public long addToKafkaEventSendTimer(long delta) {
+    return addAndGet(TIMER_KAFKA_EVENT_SEND,delta);
+  }
+
+  public long incrementRollbackCount() {
+    return increment(COUNT_ROLLBACK);
+  }
+
+  public long getKafkaEventSendTimer() {
+    return get(TIMER_KAFKA_EVENT_SEND);
+  }
+
+  public long getRollbackCount() {
+    return get(COUNT_ROLLBACK);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java
new file mode 100644
index 0000000..f49ca26
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSinkCounterMBean.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+public interface KafkaSinkCounterMBean {
+
+  long getKafkaEventSendTimer();
+
+  long getRollbackCount();
+
+  long getConnectionCreatedCount();
+
+  long getConnectionClosedCount();
+
+  long getConnectionFailedCount();
+
+  long getBatchEmptyCount();
+
+  long getBatchUnderflowCount();
+
+  long getBatchCompleteCount();
+
+  long getEventDrainAttemptCount();
+
+  long getEventDrainSuccessCount();
+
+  long getStartTime();
+
+  long getStopTime();
+
+  String getType();
+}
+

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
new file mode 100644
index 0000000..1cb911d
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+import org.apache.flume.instrumentation.SourceCounter;
+
+public class KafkaSourceCounter extends SourceCounter implements  KafkaSourceCounterMBean {
+
+  private static final String TIMER_KAFKA_EVENT_GET =
+      "source.kafka.event.get.time";
+
+  private static final String TIMER_KAFKA_COMMIT =
+      "source.kafka.commit.time";
+
+  private static final String COUNTER_KAFKA_EMPTY =
+      "source.kafka.empty.count";
+
+  private static final String[] ATTRIBUTES =
+      {TIMER_KAFKA_COMMIT, TIMER_KAFKA_EVENT_GET};
+
+  public KafkaSourceCounter(String name) {
+    super(name, ATTRIBUTES);
+  }
+
+  public long addToKafkaEventGetTimer(long delta) {
+    return addAndGet(TIMER_KAFKA_EVENT_GET,delta);
+  }
+
+  public long addToKafkaCommitTimer(long delta) {
+    return addAndGet(TIMER_KAFKA_COMMIT,delta);
+  }
+
+  public long incrementKafkaEmptyCount() {
+    return increment(COUNTER_KAFKA_EMPTY);
+  }
+
+  public long getKafkaCommitTimer() {
+    return get(TIMER_KAFKA_COMMIT);
+  }
+
+  public long getKafkaEventGetTimer() {
+    return get(TIMER_KAFKA_EVENT_GET);
+  }
+
+  public long getKafkaEmptyCount() {
+    return get(COUNTER_KAFKA_EMPTY);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java
new file mode 100644
index 0000000..219a5b6
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterMBean.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.instrumentation.kafka;
+
+public interface KafkaSourceCounterMBean {
+
+  long getKafkaEventGetTimer();
+
+  long getKafkaCommitTimer();
+
+  long getKafkaEmptyCount();
+
+  long getEventReceivedCount();
+
+  long getEventAcceptedCount();
+
+  long getAppendReceivedCount();
+
+  long getAppendAcceptedCount();
+
+  long getAppendBatchReceivedCount();
+
+  long getAppendBatchAcceptedCount();
+
+  long getStartTime();
+
+  long getStopTime();
+
+  String getType();
+
+  long getOpenConnectionCount();
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
index a90b950..eada17c 100644
--- a/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
+++ b/flume-ng-sinks/flume-ng-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java
@@ -24,6 +24,7 @@ import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
 import org.apache.flume.*;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.kafka.KafkaSinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +74,8 @@ public class KafkaSink extends AbstractSink implements Configurable {
   private String topic;
   private int batchSize;
   private List<KeyedMessage<String, byte[]>> messageList;
+  private KafkaSinkCounter counter;
+
 
   @Override
   public Status process() throws EventDeliveryException {
@@ -122,7 +125,11 @@ public class KafkaSink extends AbstractSink implements Configurable {
 
       // publish batch and commit.
       if (processedEvents > 0) {
+        long startTime = System.nanoTime();
         producer.send(messageList);
+        long endTime = System.nanoTime();
+        counter.addToKafkaEventSendTimer((endTime-startTime)/(1000*1000));
+        counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
       }
 
       transaction.commit();
@@ -134,6 +141,7 @@ public class KafkaSink extends AbstractSink implements Configurable {
       if (transaction != null) {
         try {
           transaction.rollback();
+          counter.incrementRollbackCount();
         } catch (Exception e) {
           logger.error("Transaction rollback failed", e);
           throw Throwables.propagate(e);
@@ -154,12 +162,15 @@ public class KafkaSink extends AbstractSink implements Configurable {
     // instantiate the producer
     ProducerConfig config = new ProducerConfig(kafkaProps);
     producer = new Producer<String, byte[]>(config);
+    counter.start();
     super.start();
   }
 
   @Override
   public synchronized void stop() {
     producer.close();
+    counter.stop();
+    logger.info("Kafka Sink {} stopped. Metrics: {}", getName(), counter);
     super.stop();
   }
 
@@ -202,5 +213,9 @@ public class KafkaSink extends AbstractSink implements Configurable {
     if (logger.isDebugEnabled()) {
       logger.debug("Kafka producer properties: " + kafkaProps);
     }
+
+    if (counter == null) {
+      counter = new KafkaSinkCounter(getName());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1d9bab67/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
index 00a81c6..3777639 100644
--- a/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
+++ b/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java
@@ -32,6 +32,8 @@ import org.apache.flume.*;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurationException;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.instrumentation.kafka.KafkaSourceCounter;
 import org.apache.flume.source.AbstractSource;
 
 import org.slf4j.Logger;
@@ -77,6 +79,7 @@ public class KafkaSource extends AbstractSource
   private Context context;
   private Properties kafkaProps;
   private final List<Event> eventList = new ArrayList<Event>();
+  private KafkaSourceCounter counter;
 
   public Status process() throws EventDeliveryException {
 
@@ -88,6 +91,7 @@ public class KafkaSource extends AbstractSource
     long batchEndTime = System.currentTimeMillis() + timeUpperLimit;
     try {
       boolean iterStatus = false;
+      long startTime = System.nanoTime();
       while (eventList.size() < batchUpperLimit &&
               System.currentTimeMillis() < batchEndTime) {
         iterStatus = hasNext();
@@ -116,22 +120,30 @@ public class KafkaSource extends AbstractSource
           log.debug("Event #: {}", eventList.size());
         }
       }
+      long endTime = System.nanoTime();
+      counter.addToKafkaEventGetTimer((endTime-startTime)/(1000*1000));
+      counter.addToEventReceivedCount(Long.valueOf(eventList.size()));
       // If we have events, send events to channel
       // clear the event list
       // and commit if Kafka doesn't auto-commit
       if (eventList.size() > 0) {
         getChannelProcessor().processEventBatch(eventList);
+        counter.addToEventAcceptedCount(eventList.size());
         eventList.clear();
         if (log.isDebugEnabled()) {
           log.debug("Wrote {} events to channel", eventList.size());
         }
         if (!kafkaAutoCommitEnabled) {
           // commit the read transactions to Kafka to avoid duplicates
+          long commitStartTime = System.nanoTime();
           consumer.commitOffsets();
+          long commitEndTime = System.nanoTime();
+          counter.addToKafkaCommitTimer((commitEndTime-commitStartTime)/(1000*1000));
         }
       }
       if (!iterStatus) {
         if (log.isDebugEnabled()) {
+          counter.incrementKafkaEmptyCount();
           log.debug("Returning with backoff. No more data to read");
         }
         return Status.BACKOFF;
@@ -174,6 +186,9 @@ public class KafkaSource extends AbstractSource
     kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty(
             KafkaSourceConstants.AUTO_COMMIT_ENABLED));
 
+    if (counter == null) {
+      counter = new KafkaSourceCounter(getName());
+    }
   }
 
   @Override
@@ -207,6 +222,7 @@ public class KafkaSource extends AbstractSource
       throw new FlumeException("Unable to get message iterator from Kafka", e);
     }
     log.info("Kafka source {} started.", getName());
+    counter.start();
     super.start();
   }
 
@@ -217,6 +233,8 @@ public class KafkaSource extends AbstractSource
       // to avoid reading the same messages again
       consumer.shutdown();
     }
+    counter.stop();
+    log.info("Kafka Source {} stopped. Metrics: {}", getName(), counter);
     super.stop();
   }