You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2012/07/07 00:30:12 UTC

svn commit: r1358458 - in /incubator/flume/branches/branch-1.2.0: conf/ flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ flume-ng-core/src/main/java/org/apache/flume/channel/ flume-ng-core/src/main/java/org/apache/flume...

Author: mpercy
Date: Fri Jul  6 22:30:11 2012
New Revision: 1358458

URL: http://svn.apache.org/viewvc?rev=1358458&view=rev
Log:
FLUME-748. Create metric collection infrastructure.

(Arvind Prabhakar via Mike Percy)

Added:
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
Modified:
    incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template
    incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
    incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
    incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java

Modified: incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template (original)
+++ incubator/flume/branches/branch-1.2.0/conf/flume-env.sh.template Fri Jul  6 22:30:11 2012
@@ -21,8 +21,8 @@
 
 #JAVA_HOME=/usr/lib/jvm/java-6-sun
 
-# Give Flume more memory and pre-allocate
-#JAVA_OPTS="-Xms100m -Xmx200m"
+# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
+#JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"
 
 # Note that the Flume conf directory is always included in the classpath.
 #FLUME_CLASSPATH=""

Modified: incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java Fri Jul  6 22:30:11 2012
@@ -34,6 +34,7 @@ import org.apache.flume.Event;
 import org.apache.flume.channel.BasicChannelSemantics;
 import org.apache.flume.channel.BasicTransactionSemantics;
 import org.apache.flume.channel.file.Log.Builder;
+import org.apache.flume.instrumentation.ChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,6 +85,7 @@ public class FileChannel extends BasicCh
   private int logWriteTimeout;
   private int checkpointWriteTimeout;
   private String channelNameDescriptor = "[channel=unknown]";
+  private ChannelCounter channelCounter;
 
   @Override
   public synchronized void setName(String name) {
@@ -202,6 +204,10 @@ public class FileChannel extends BasicCh
       log.setCheckpointInterval(checkpointInterval);
       log.setMaxFileSize(maxFileSize);
     }
+
+    if (channelCounter == null) {
+      channelCounter = new ChannelCounter(getName());
+    }
   }
 
   @Override
@@ -231,13 +237,22 @@ public class FileChannel extends BasicCh
       open = false;
       LOG.error("Failed to start the file channel", ex);
     }
+    if (open) {
+      channelCounter.start();
+      channelCounter.setChannelSize(getDepth());
+    }
     super.start();
   }
 
   @Override
   public synchronized void stop() {
     LOG.info("Stopping {}...", this);
+    int size = getDepth();
     close();
+    if (!open) {
+      channelCounter.setChannelSize(size);
+      channelCounter.stop();
+    }
     super.stop();
   }
 
@@ -256,7 +271,8 @@ public class FileChannel extends BasicCh
               trans.getStateAsString()  + channelNameDescriptor);
     }
     trans = new FileBackedTransaction(log, TRANSACTION_ID.incrementAndGet(),
-        transactionCapacity, keepAlive, queueRemaining, getName());
+        transactionCapacity, keepAlive, queueRemaining, getName(),
+        channelCounter);
     transactions.set(trans);
     return trans;
   }
@@ -294,9 +310,10 @@ public class FileChannel extends BasicCh
     private final FlumeEventQueue queue;
     private final Semaphore queueRemaining;
     private final String channelNameDescriptor;
+    private final ChannelCounter channelCounter;
     public FileBackedTransaction(Log log, long transactionID,
         int transCapacity, int keepAlive, Semaphore queueRemaining,
-        String name) {
+        String name, ChannelCounter counter) {
       this.log = log;
       queue = log.getFlumeEventQueue();
       this.transactionID = transactionID;
@@ -305,6 +322,7 @@ public class FileChannel extends BasicCh
       putList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
       takeList = new LinkedBlockingDeque<FlumeEventPointer>(transCapacity);
       channelNameDescriptor = "[channel=" + name + "]";
+      this.channelCounter = counter;
     }
     private boolean isClosed() {
       return State.CLOSED.equals(getState());
@@ -314,6 +332,7 @@ public class FileChannel extends BasicCh
     }
     @Override
     protected void doPut(Event event) throws InterruptedException {
+      channelCounter.incrementEventPutAttemptCount();
       if(putList.remainingCapacity() == 0) {
         throw new ChannelException("Put queue for FileBackedTransaction " +
             "of capacity " + putList.size() + " full, consider " +
@@ -336,6 +355,7 @@ public class FileChannel extends BasicCh
 
     @Override
     protected Event doTake() throws InterruptedException {
+      channelCounter.incrementEventTakeAttemptCount();
       if(takeList.remainingCapacity() == 0) {
         throw new ChannelException("Take list for FileBackedTransaction, capacity " +
             takeList.size() + " full, consider committing more frequently, " +
@@ -384,6 +404,7 @@ public class FileChannel extends BasicCh
         }
         try {
           log.commitPut(transactionID);
+          channelCounter.addToEventPutSuccessCount(puts);
         } catch (IOException e) {
           throw new ChannelException("Commit failed due to IO error "
               + channelNameDescriptor, e);
@@ -391,6 +412,7 @@ public class FileChannel extends BasicCh
       } else if(takes > 0) {
         try {
           log.commitTake(transactionID);
+          channelCounter.addToEventTakeSuccessCount(takes);
         } catch (IOException e) {
           throw new ChannelException("Commit failed due to IO error "
                + channelNameDescriptor, e);
@@ -399,6 +421,7 @@ public class FileChannel extends BasicCh
       }
       putList.clear();
       takeList.clear();
+      channelCounter.setChannelSize(queue.getSize());
     }
 
     @Override
@@ -423,6 +446,7 @@ public class FileChannel extends BasicCh
       }
       putList.clear();
       takeList.clear();
+      channelCounter.setChannelSize(queue.getSize());
     }
 
   }

Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Fri Jul  6 22:30:11 2012
@@ -24,9 +24,10 @@ import java.util.concurrent.TimeUnit;
 
 import javax.annotation.concurrent.GuardedBy;
 
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.ChannelException;
+import org.apache.flume.instrumentation.ChannelCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,14 +42,18 @@ public class MemoryChannel extends Basic
   public class MemoryTransaction extends BasicTransactionSemantics {
     private LinkedBlockingDeque<Event> takeList;
     private LinkedBlockingDeque<Event> putList;
+    private final ChannelCounter channelCounter;
 
-    public MemoryTransaction(int transCapacity) {
+    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
       putList = new LinkedBlockingDeque<Event>(transCapacity);
       takeList = new LinkedBlockingDeque<Event>(transCapacity);
+
+      channelCounter = counter;
     }
 
     @Override
     protected void doPut(Event event) {
+      channelCounter.incrementEventPutAttemptCount();
       if(!putList.offer(event)) {
         throw new ChannelException("Put queue for MemoryTransaction of capacity " +
             putList.size() + " full, consider committing more frequently, " +
@@ -58,6 +63,7 @@ public class MemoryChannel extends Basic
 
     @Override
     protected Event doTake() throws InterruptedException {
+      channelCounter.incrementEventTakeAttemptCount();
       if(takeList.remainingCapacity() == 0) {
         throw new ChannelException("Take list for MemoryTransaction, capacity " +
             takeList.size() + " full, consider committing more frequently, " +
@@ -87,6 +93,7 @@ public class MemoryChannel extends Basic
         }
       }
       int puts = putList.size();
+      int takes = takeList.size();
       synchronized(queueLock) {
         if(puts > 0 ) {
           while(!putList.isEmpty()) {
@@ -102,7 +109,14 @@ public class MemoryChannel extends Basic
       if(remainingChange > 0) {
         queueRemaining.release(remainingChange);
       }
+      if (puts > 0) {
+        channelCounter.addToEventPutSuccessCount(puts);
+      }
+      if (takes > 0) {
+        channelCounter.addToEventTakeSuccessCount(takes);
+      }
 
+      channelCounter.setChannelSize(queue.size());
     }
 
     @Override
@@ -117,6 +131,7 @@ public class MemoryChannel extends Basic
         putList.clear();
       }
       queueStored.release(takes);
+      channelCounter.setChannelSize(queue.size());
     }
 
   }
@@ -140,6 +155,7 @@ public class MemoryChannel extends Basic
   // maximum items in a transaction queue
   private volatile Integer transCapacity;
   private volatile int keepAlive;
+  private ChannelCounter channelCounter;
 
 
   public MemoryChannel() {
@@ -193,6 +209,10 @@ public class MemoryChannel extends Basic
         queueStored = new Semaphore(0);
       }
     }
+
+    if (channelCounter == null) {
+      channelCounter = new ChannelCounter(getName());
+    }
   }
 
   private void resizeQueue(int capacity) throws InterruptedException {
@@ -224,7 +244,21 @@ public class MemoryChannel extends Basic
   }
 
   @Override
+  public synchronized void start() {
+    channelCounter.start();
+    channelCounter.setChannelSize(queue.size());
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    channelCounter.setChannelSize(queue.size());
+    channelCounter.stop();
+    super.stop();
+  }
+
+  @Override
   protected BasicTransactionSemantics createTransaction() {
-    return new MemoryTransaction(transCapacity);
+    return new MemoryTransaction(transCapacity, channelCounter);
   }
 }

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounter.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class ChannelCounter extends MonitoredCounterGroup implements
+    ChannelCounterMBean {
+
+  private static final String COUNTER_CHANNEL_SIZE = "channel.current.size";
+
+  private static final String COUNTER_EVENT_PUT_ATTEMPT =
+      "channel.event.put.attempt";
+
+  private static final String COUNTER_EVENT_TAKE_ATTEMPT =
+      "channel.event.take.attempt";
+
+  private static final String COUNTER_EVENT_PUT_SUCCESS =
+      "channel.event.put.success";
+
+  private static final String COUNTER_EVENT_TAKE_SUCCESS =
+      "channel.event.take.success";
+
+  private static final String[] ATTRIBUTES = {
+    COUNTER_CHANNEL_SIZE, COUNTER_EVENT_PUT_ATTEMPT,
+    COUNTER_EVENT_TAKE_ATTEMPT, COUNTER_EVENT_PUT_SUCCESS,
+    COUNTER_EVENT_TAKE_SUCCESS
+  };
+
+  public ChannelCounter(String name) {
+    super(MonitoredCounterGroup.Type.CHANNEL, name, ATTRIBUTES);
+  }
+
+  @Override
+  public long getChannelSize() {
+    return get(COUNTER_CHANNEL_SIZE);
+  }
+
+  public void setChannelSize(long newSize) {
+    set(COUNTER_CHANNEL_SIZE, newSize);
+  }
+
+  @Override
+  public long getEventPutAttemptCount() {
+    return get(COUNTER_EVENT_PUT_ATTEMPT);
+  }
+
+  public long incrementEventPutAttemptCount() {
+    return increment(COUNTER_EVENT_PUT_ATTEMPT);
+  }
+
+  @Override
+  public long getEventTakeAttemptCount() {
+    return get(COUNTER_EVENT_TAKE_ATTEMPT);
+  }
+
+  public long incrementEventTakeAttemptCount() {
+    return increment(COUNTER_EVENT_TAKE_ATTEMPT);
+  }
+
+  @Override
+  public long getEventPutSuccessCount() {
+    return get(COUNTER_EVENT_PUT_SUCCESS);
+  }
+
+  public long addToEventPutSuccessCount(long delta) {
+    return addAndGet(COUNTER_EVENT_PUT_SUCCESS, delta);
+  }
+
+  @Override
+  public long getEventTakeSuccessCount() {
+    return get(COUNTER_EVENT_TAKE_SUCCESS);
+  }
+
+  public long addToEventTakeSuccessCount(long delta) {
+    return addAndGet(COUNTER_EVENT_TAKE_SUCCESS, delta);
+  }
+}

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelCounterMBean.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public interface ChannelCounterMBean {
+
+  long getChannelSize();
+
+  long getEventPutAttemptCount();
+
+  long getEventTakeAttemptCount();
+
+  long getEventPutSuccessCount();
+
+  long getEventTakeSuccessCount();
+
+  long getStartTime();
+
+  long getStopTime();
+}

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/ChannelProcessorCounter.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class ChannelProcessorCounter extends MonitoredCounterGroup {
+
+  protected ChannelProcessorCounter(String name) {
+    super(MonitoredCounterGroup.Type.CHANNEL_PROCESSOR, name);
+  }
+
+}

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/MonitoredCounterGroup.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+import java.lang.management.ManagementFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.ObjectName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class MonitoredCounterGroup {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MonitoredCounterGroup.class);
+
+  private final Type type;
+  private final String name;
+  private final Map<String, AtomicLong> counterMap;
+
+  private AtomicLong startTime;
+  private AtomicLong stopTime;
+
+
+  protected MonitoredCounterGroup(Type type, String name, String... attrs) {
+    this.type = type;
+    this.name = name;
+
+    Map<String, AtomicLong> counterInitMap = new HashMap<String, AtomicLong>();
+
+    // Initialize the counters
+    for (String attribute : attrs) {
+      counterInitMap.put(attribute, new AtomicLong(0L));
+    }
+
+    counterMap = Collections.unmodifiableMap(counterInitMap);
+
+    startTime = new AtomicLong(0L);
+    stopTime = new AtomicLong(0L);
+
+    try {
+      ObjectName objName = new ObjectName("org.apache.flume."
+          + type.name().toLowerCase() + ":type=" + this.name);
+
+      ManagementFactory.getPlatformMBeanServer().registerMBean(this, objName);
+
+      LOG.info("Monitoried counter group for type: " + type + ", name: " + name
+          + ", registered successfully.");
+    } catch (Exception ex) {
+      LOG.error("Failed to register monitored counter group for type: "
+          + type + ", name: " + name, ex);
+    }
+  }
+
+  public void start() {
+    stopTime.set(0L);
+    for (String counter : counterMap.keySet()) {
+      counterMap.get(counter).set(0L);
+    }
+    startTime.set(System.currentTimeMillis());
+    LOG.info("Component type: " + type + ", name: " + name + " started");
+  }
+
+  public void stop() {
+    stopTime.set(System.currentTimeMillis());
+    LOG.info("Component type: " + type + ", name: " + name + " stopped");
+  }
+
+  public long getStartTime() {
+    return startTime.get();
+  }
+
+  public long getStopTime() {
+    return stopTime.get();
+  }
+
+  @Override
+  public final String toString() {
+    StringBuilder sb = new StringBuilder(type.name()).append(":");
+    sb.append(name).append("{");
+    boolean first = true;
+    Iterator<String> counterIterator = counterMap.keySet().iterator();
+    while (counterIterator.hasNext()) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(", ");
+      }
+      String counterName = counterIterator.next();
+      sb.append(counterName).append("=").append(get(counterName));
+    }
+    sb.append("}");
+
+    return sb.toString();
+  }
+
+
+  protected long get(String counter) {
+    return counterMap.get(counter).get();
+  }
+
+  protected void set(String counter, long value) {
+    counterMap.get(counter).set(value);
+  }
+
+  protected long addAndGet(String counter, long delta) {
+    return counterMap.get(counter).addAndGet(delta);
+  }
+
+  protected long increment(String counter) {
+    return counterMap.get(counter).incrementAndGet();
+  }
+
+  public static enum Type {
+    SOURCE,
+    CHANNEL_PROCESSOR,
+    CHANNEL,
+    SINK_PROCESSOR,
+    SINK
+  };
+}

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounter.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,138 @@
+/*
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class SinkCounter extends MonitoredCounterGroup implements
+    SinkCounterMBean {
+
+  private static final String COUNTER_CONNECTION_CREATED =
+      "sink.connection.creation.count";
+
+  private static final String COUNTER_CONNECTION_CLOSED =
+      "sink.connection.closed.count";
+
+  private static final String COUNTER_CONNECTION_FAILED =
+      "sink.connection.failed.count";
+
+  private static final String COUNTER_BATCH_EMPTY =
+      "sink.batch.empty";
+
+  private static final String COUNTER_BATCH_UNDERFLOW =
+      "sink.batch.underflow";
+
+  private static final String COUNTER_BATCH_COMPLETE =
+      "sink.batch.complete";
+
+  private static final String COUNTER_EVENT_DRAIN_ATTEMPT =
+      "sink.event.drain.attempt";
+
+  private static final String COUNTER_EVENT_DRAIN_SUCCESS =
+      "sink.event.drain.sucess";
+
+  private static final String[] ATTRIBUTES = {
+    COUNTER_CONNECTION_CREATED, COUNTER_CONNECTION_CLOSED,
+    COUNTER_CONNECTION_FAILED, COUNTER_BATCH_EMPTY,
+    COUNTER_BATCH_UNDERFLOW, COUNTER_BATCH_COMPLETE,
+    COUNTER_EVENT_DRAIN_ATTEMPT, COUNTER_EVENT_DRAIN_SUCCESS
+  };
+
+
+  public SinkCounter(String name) {
+    super(MonitoredCounterGroup.Type.SINK, name, ATTRIBUTES);
+  }
+
+  @Override
+  public long getConnectionCreatedCount() {
+    return get(COUNTER_CONNECTION_CREATED);
+  }
+
+  public long incrementConnectionCreatedCount() {
+    return increment(COUNTER_CONNECTION_CREATED);
+  }
+
+  @Override
+  public long getConnectionClosedCount() {
+    return get(COUNTER_CONNECTION_CLOSED);
+  }
+
+  public long incrementConnectionClosedCount() {
+    return increment(COUNTER_CONNECTION_CLOSED);
+  }
+
+  @Override
+  public long getConnectionFailedCount() {
+    return get(COUNTER_CONNECTION_FAILED);
+  }
+
+  public long incrementConnectionFailedCount() {
+    return increment(COUNTER_CONNECTION_FAILED);
+  }
+
+  @Override
+  public long getBatchEmptyCount() {
+    return get(COUNTER_BATCH_EMPTY);
+  }
+
+  public long incrementBatchEmptyCount() {
+    return increment(COUNTER_BATCH_EMPTY);
+  }
+
+  @Override
+  public long getBatchUnderflowCount() {
+    return get(COUNTER_BATCH_UNDERFLOW);
+  }
+
+  public long incrementBatchUnderflowCount() {
+    return increment(COUNTER_BATCH_UNDERFLOW);
+  }
+
+  @Override
+  public long getBatchCompleteCount() {
+    return get(COUNTER_BATCH_COMPLETE);
+  }
+
+  public long incrementBatchCompleteCount() {
+    return increment(COUNTER_BATCH_COMPLETE);
+  }
+
+  @Override
+  public long getEventDrainAttemptCount() {
+    return get(COUNTER_EVENT_DRAIN_ATTEMPT);
+  }
+
+  public long incrementEventDrainAttemptCount() {
+    return increment(COUNTER_EVENT_DRAIN_ATTEMPT);
+  }
+
+  public long addToEventDrainAttemptCount(long delta) {
+    return addAndGet(COUNTER_EVENT_DRAIN_ATTEMPT, delta);
+  }
+
+  @Override
+  public long getEventDrainSuccessCount() {
+    return get(COUNTER_EVENT_DRAIN_SUCCESS);
+  }
+
+  public long incrementEventDrainSuccessCount() {
+    return increment(COUNTER_EVENT_DRAIN_SUCCESS);
+  }
+
+  public long addToEventDrainSuccessCount(long delta) {
+    return addAndGet(COUNTER_EVENT_DRAIN_SUCCESS, delta);
+  }
+}

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkCounterMBean.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public interface SinkCounterMBean {
+
+  long getConnectionCreatedCount();
+
+  long getConnectionClosedCount();
+
+  long getConnectionFailedCount();
+
+  long getBatchEmptyCount();
+
+  long getBatchUnderflowCount();
+
+  long getBatchCompleteCount();
+
+  long getEventDrainAttemptCount();
+
+  long getEventDrainSuccessCount();
+
+  long getStartTime();
+
+  long getStopTime();
+}

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SinkProcessorCounter.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class SinkProcessorCounter extends MonitoredCounterGroup {
+
+  protected SinkProcessorCounter(String name) {
+    super(MonitoredCounterGroup.Type.SINK_PROCESSOR, name);
+  }
+
+}

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounter.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public class SourceCounter extends MonitoredCounterGroup implements
+    SourceCounterMBean {
+
+  private static final String COUNTER_EVENTS_RECEIVED =
+      "src.events.received";
+  private static final String COUNTER_EVENTS_ACCEPTED =
+      "src.events.accepted";
+
+  private static final String COUNTER_APPEND_RECEIVED =
+      "src.append.received";
+  private static final String COUNTER_APPEND_ACCEPTED =
+      "src.append.accepted";
+
+  private static final String COUNTER_APPEND_BATCH_RECEIVED =
+      "src.append-batch.received";
+  private static final String COUNTER_APPEND_BATCH_ACCEPTED =
+      "src.append-batch.accepted";
+
+
+  private static final String[] ATTRIBUTES =
+    {
+      COUNTER_EVENTS_RECEIVED, COUNTER_EVENTS_ACCEPTED,
+      COUNTER_APPEND_RECEIVED, COUNTER_APPEND_ACCEPTED,
+      COUNTER_APPEND_BATCH_RECEIVED, COUNTER_APPEND_BATCH_ACCEPTED
+    };
+
+
+  public SourceCounter(String name) {
+    super(MonitoredCounterGroup.Type.SOURCE, name, ATTRIBUTES);
+  }
+
+  @Override
+  public long getEventReceivedCount() {
+    return get(COUNTER_EVENTS_RECEIVED);
+  }
+
+  public long incrementEventReceivedCount() {
+    return increment(COUNTER_EVENTS_RECEIVED);
+  }
+
+  public long addToEventReceivedCount(long delta) {
+    return addAndGet(COUNTER_EVENTS_RECEIVED, delta);
+  }
+
+  @Override
+  public long getEventAcceptedCount() {
+    return get(COUNTER_EVENTS_ACCEPTED);
+  }
+
+  public long incrementEventAcceptedCount() {
+    return increment(COUNTER_EVENTS_ACCEPTED);
+  }
+
+  public long addToEventAcceptedCount(long delta) {
+    return addAndGet(COUNTER_EVENTS_ACCEPTED, delta);
+  }
+
+  @Override
+  public long getAppendReceivedCount() {
+    return get(COUNTER_APPEND_RECEIVED);
+  }
+
+  public long incrementAppendReceivedCount() {
+    return increment(COUNTER_APPEND_RECEIVED);
+  }
+
+  @Override
+  public long getAppendAcceptedCount() {
+    return get(COUNTER_APPEND_ACCEPTED);
+  }
+
+  public long incrementAppendAcceptedCount() {
+    return increment(COUNTER_APPEND_ACCEPTED);
+  }
+
+  @Override
+  public long getAppendBatchReceivedCount() {
+    return get(COUNTER_APPEND_BATCH_RECEIVED);
+  }
+
+  public long incrementAppendBatchReceivedCount() {
+    return increment(COUNTER_APPEND_BATCH_RECEIVED);
+  }
+
+  @Override
+  public long getAppendBatchAcceptedCount() {
+    return get(COUNTER_APPEND_BATCH_ACCEPTED);
+  }
+
+  public long incrementAppendBatchAcceptedCount() {
+    return increment(COUNTER_APPEND_BATCH_ACCEPTED);
+  }
+}

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/instrumentation/SourceCounterMBean.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+public interface SourceCounterMBean {
+
+  long getEventReceivedCount();
+
+  long getEventAcceptedCount();
+
+  long getAppendReceivedCount();
+
+  long getAppendAcceptedCount();
+
+  long getAppendBatchReceivedCount();
+
+  long getAppendBatchAcceptedCount();
+
+  long getStartTime();
+
+  long getStopTime();
+}

Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Fri Jul  6 22:30:11 2012
@@ -20,27 +20,27 @@
 package org.apache.flume.sink;
 
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
 import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientConfigurationConstants;
 import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.source.AvroSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import java.util.Properties;
-import org.apache.flume.api.RpcClientConfigurationConstants;
 
 /**
  * <p>
@@ -125,12 +125,8 @@ public class AvroSink extends AbstractSi
   private Integer port;
 
   private RpcClient client;
-  private CounterGroup counterGroup;
   private Properties clientProps;
-
-  public AvroSink() {
-    counterGroup = new CounterGroup();
-  }
+  private SinkCounter sinkCounter;
 
   @Override
   public void configure(Context context) {
@@ -165,6 +161,10 @@ public class AvroSink extends AbstractSi
           RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT,
           String.valueOf(requestTimeout));
     }
+
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
   }
 
   /**
@@ -178,8 +178,16 @@ public class AvroSink extends AbstractSi
       logger.info("Avro sink {}: Building RpcClient with hostname: {}, " +
           "port: {}",
           new Object[] { getName(), hostname, port });
-
+      try {
        client = RpcClientFactory.getInstance(clientProps);
+      } catch (Exception ex) {
+        sinkCounter.incrementConnectionFailedCount();
+        if (ex instanceof FlumeException) {
+          throw (FlumeException) ex;
+        } else {
+          throw new FlumeException(ex);
+        }
+      }
        logger.debug("Avro sink {}: Created RpcClient: {}", getName(), client);
     }
 
@@ -190,7 +198,9 @@ public class AvroSink extends AbstractSi
       logger.debug("Avro sink {} closing avro client: {}", getName(), client);
       try {
         client.close();
+        sinkCounter.incrementConnectionClosedCount();
       } catch (FlumeException e) {
+        sinkCounter.incrementConnectionFailedCount();
         logger.error("Avro sink " + getName() + ": Attempt to close avro " +
             "client failed. Exception follows.", e);
       }
@@ -226,7 +236,7 @@ public class AvroSink extends AbstractSi
   @Override
   public void start() {
     logger.info("Starting {}...", this);
-
+    sinkCounter.start();
     try {
       createConnection();
     } catch (FlumeException e) {
@@ -247,10 +257,10 @@ public class AvroSink extends AbstractSi
     logger.info("Avro sink {} stopping...", getName());
 
     destroyConnection();
-
+    sinkCounter.stop();
     super.stop();
 
-    logger.info("Avro sink {} stopped. Metrics: {}", getName(), counterGroup);
+    logger.info("Avro sink {} stopped. Metrics: {}", getName(), sinkCounter);
   }
 
   @Override
@@ -276,26 +286,33 @@ public class AvroSink extends AbstractSi
         Event event = channel.take();
 
         if (event == null) {
-          counterGroup.incrementAndGet("batch.underflow");
           break;
         }
 
         batch.add(event);
       }
 
-      if (batch.isEmpty()) {
-        counterGroup.incrementAndGet("batch.empty");
+      int size = batch.size();
+      int batchSize = client.getBatchSize();
+
+      if (size == 0) {
+        sinkCounter.incrementBatchEmptyCount();
         status = Status.BACKOFF;
       } else {
+        if (size < batchSize) {
+          sinkCounter.incrementBatchUnderflowCount();
+        } else {
+          sinkCounter.incrementBatchCompleteCount();
+        }
+        sinkCounter.addToEventDrainAttemptCount(size);
         client.appendBatch(batch);
       }
 
       transaction.commit();
-      counterGroup.incrementAndGet("batch.success");
+      sinkCounter.addToEventDrainSuccessCount(size);
 
     } catch (Throwable t) {
       transaction.rollback();
-      counterGroup.incrementAndGet("batch.failure");
       if (t instanceof Error) {
         throw (Error) t;
       } else if (t instanceof ChannelException) {

Modified: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Fri Jul  6 22:30:11 2012
@@ -33,12 +33,12 @@ import org.apache.avro.ipc.specific.Spec
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.Source;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
@@ -113,14 +113,10 @@ public class AvroSource extends Abstract
   private String bindAddress;
 
   private Server server;
-  private CounterGroup counterGroup;
+  private SourceCounter sourceCounter;
 
   private int maxThreads;
 
-  public AvroSource() {
-    counterGroup = new CounterGroup();
-  }
-
   @Override
   public void configure(Context context) {
     port = Integer.parseInt(context.getString("port"));
@@ -131,6 +127,10 @@ public class AvroSource extends Abstract
       logger.warn("AVRO source\'s \"threads\" property must specify an integer value.",
               context.getString(THREADS));
     }
+
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
   }
 
   @Override
@@ -149,7 +149,7 @@ public class AvroSource extends Abstract
     }
 
     server.start();
-
+    sourceCounter.start();
     super.start();
 
     logger.info("Avro source {} started.", getName());
@@ -167,10 +167,11 @@ public class AvroSource extends Abstract
       logger.info("Avro source " + getName() + ": Interrupted while waiting " +
           "for Avro server to stop. Exiting. Exception follows.", e);
     }
-
+    sourceCounter.stop();
     super.stop();
 
-    logger.info("Avro source {} stopped. Metrics: {}", getName(), counterGroup);
+    logger.info("Avro source {} stopped. Metrics: {}", getName(),
+        sourceCounter);
   }
 
   @Override
@@ -194,8 +195,10 @@ public class AvroSource extends Abstract
 
   @Override
   public Status append(AvroFlumeEvent avroEvent) {
-    logger.debug("Avro source {}: Received avro event: {}", getName(), avroEvent);
-    counterGroup.incrementAndGet("rpc.received");
+    logger.debug("Avro source {}: Received avro event: {}", getName(),
+        avroEvent);
+    sourceCounter.incrementAppendReceivedCount();
+    sourceCounter.incrementEventReceivedCount();
 
     Event event = EventBuilder.withBody(avroEvent.getBody().array(),
         toStringMap(avroEvent.getHeaders()));
@@ -208,7 +211,8 @@ public class AvroSource extends Abstract
       return Status.FAILED;
     }
 
-    counterGroup.incrementAndGet("rpc.successful");
+    sourceCounter.incrementAppendAcceptedCount();
+    sourceCounter.incrementEventAcceptedCount();
 
     return Status.OK;
   }
@@ -217,14 +221,14 @@ public class AvroSource extends Abstract
   public Status appendBatch(List<AvroFlumeEvent> events) {
     logger.debug("Avro source {}: Received avro event batch of {} events.",
         getName(), events.size());
-    counterGroup.incrementAndGet("rpc.batch.received");
+    sourceCounter.incrementAppendBatchReceivedCount();
+    sourceCounter.addToEventReceivedCount(events.size());
 
     List<Event> batch = new ArrayList<Event>();
 
     for (AvroFlumeEvent avroEvent : events) {
       Event event = EventBuilder.withBody(avroEvent.getBody().array(),
           toStringMap(avroEvent.getHeaders()));
-      counterGroup.incrementAndGet("rpc.batch.events");
 
       batch.add(event);
     }
@@ -237,7 +241,8 @@ public class AvroSource extends Abstract
       return Status.FAILED;
     }
 
-    counterGroup.incrementAndGet("rpc.batch.successful");
+    sourceCounter.incrementAppendBatchAcceptedCount();
+    sourceCounter.addToEventAcceptedCount(events.size());
 
     return Status.OK;
   }

Added: incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java?rev=1358458&view=auto
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java (added)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java Fri Jul  6 22:30:11 2012
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.instrumentation;
+
+import java.lang.management.ManagementFactory;
+import java.util.Random;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import junit.framework.Assert;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMonitoredCounterGroup {
+
+  private static final int MAX_BOUNDS = 1000;
+  private static final String ROOT_OBJ_NAME_PREFIX = "org.apache.flume.";
+  private static final String SOURCE_OBJ_NAME_PREFIX = ROOT_OBJ_NAME_PREFIX
+      + "source:type=";
+  private static final String CHANNEL_OBJ_NAME_PREFIX = ROOT_OBJ_NAME_PREFIX
+      + "channel:type=";
+  private static final String SINK_OBJ_NAME_PREFIX = ROOT_OBJ_NAME_PREFIX
+      + "sink:type=";
+
+  private static final String ATTR_START_TIME = "StartTime";
+  private static final String ATTR_STOP_TIME = "StopTime";
+
+  private static final String SRC_ATTR_EVENT_RECEVIED_COUNT =
+      "EventReceivedCount";
+  private static final String SRC_ATTR_EVENT_ACCEPTED_COUNT =
+      "EventAcceptedCount";
+  private static final String SRC_ATTR_APPEND_RECEVIED_COUNT =
+      "AppendReceivedCount";
+  private static final String SRC_ATTR_APPEND_ACCEPTED_COUNT =
+      "AppendAcceptedCount";
+  private static final String SRC_ATTR_APPEND_BATCH_RECEVIED_COUNT =
+      "AppendBatchReceivedCount";
+  private static final String SRC_ATTR_APPEND_BATCH_ACCEPTED_COUNT =
+      "AppendBatchAcceptedCount";
+
+
+  private static final String CH_ATTR_CHANNEL_SIZE = "ChannelSize";
+  private static final String CH_ATTR_EVENT_PUT_ATTEMPT =
+      "EventPutAttemptCount";
+  private static final String CH_ATTR_EVENT_TAKE_ATTEMPT =
+      "EventTakeAttemptCount";
+  private static final String CH_ATTR_EVENT_PUT_SUCCESS =
+      "EventPutSuccessCount";
+  private static final String CH_ATTR_EVENT_TAKE_SUCCESS =
+      "EventTakeSuccessCount";
+
+  private static final String SK_ATTR_CONN_CREATED =
+      "ConnectionCreatedCount";
+  private static final String SK_ATTR_CONN_CLOSED =
+      "ConnectionClosedCount";
+  private static final String SK_ATTR_CONN_FAILED =
+      "ConnectionFailedCount";
+  private static final String SK_ATTR_BATCH_EMPTY =
+      "BatchEmptyCount";
+  private static final String SK_ATTR_BATCH_UNDERFLOW =
+      "BatchUnderflowCount";
+  private static final String SK_ATTR_BATCH_COMPLETE =
+      "BatchCompleteCount";
+  private static final String SK_ATTR_EVENT_DRAIN_ATTEMPT =
+      "EventDrainAttemptCount";
+  private static final String SK_ATTR_EVENT_DRAIN_SUCCESS =
+      "EventDrainSuccessCount";
+
+  private MBeanServer mbServer;
+  private Random random;
+
+  @Before
+  public void setUp() {
+    mbServer = ManagementFactory.getPlatformMBeanServer();
+    random = new Random(System.nanoTime());
+  }
+
+  @Test
+  public void testSinkCounter() throws Exception {
+    String name = getRandomName();
+
+    SinkCounter skc = new SinkCounter(name);
+    ObjectName on = new ObjectName(SINK_OBJ_NAME_PREFIX + name);
+    assertSkCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+
+    skc.start();
+    long start1 = getStartTime(on);
+
+    Assert.assertTrue("StartTime", start1 != 0L);
+    Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+    int connCreated = random.nextInt(MAX_BOUNDS);
+    int connClosed = random.nextInt(MAX_BOUNDS);
+    int connFailed = random.nextInt(MAX_BOUNDS);
+    int batchEmpty = random.nextInt(MAX_BOUNDS);
+    int batchUnderflow = random.nextInt(MAX_BOUNDS);
+    int batchComplete = random.nextInt(MAX_BOUNDS);
+    int eventDrainAttempt = random.nextInt(MAX_BOUNDS);
+    int eventDrainSuccess = random.nextInt(MAX_BOUNDS);
+
+    for (int i = 0; i<connCreated; i++) {
+      skc.incrementConnectionCreatedCount();
+    }
+    for (int i = 0; i<connClosed; i++) {
+      skc.incrementConnectionClosedCount();
+    }
+    for (int i = 0; i<connFailed; i++) {
+      skc.incrementConnectionFailedCount();
+    }
+    for (int i = 0; i<batchEmpty; i++) {
+      skc.incrementBatchEmptyCount();
+    }
+    for (int i = 0; i<batchUnderflow; i++) {
+      skc.incrementBatchUnderflowCount();
+    }
+    for (int i = 0; i<batchComplete; i++) {
+      skc.incrementBatchCompleteCount();
+    }
+    for (int i = 0; i<eventDrainAttempt; i++) {
+      skc.incrementEventDrainAttemptCount();
+    }
+    for (int i = 0; i<eventDrainSuccess; i++) {
+      skc.incrementEventDrainSuccessCount();
+    }
+
+    assertSkCounterState(on, connCreated, connClosed, connFailed, batchEmpty,
+        batchUnderflow, batchComplete, eventDrainAttempt, eventDrainSuccess);
+
+    skc.stop();
+
+    Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+    Assert.assertTrue("StopTime", getStopTime(on) != 0L);
+
+    assertSkCounterState(on, connCreated, connClosed, connFailed, batchEmpty,
+        batchUnderflow, batchComplete, eventDrainAttempt, eventDrainSuccess);
+
+    skc.start();
+    Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+    Assert.assertTrue("StartTime", getStartTime(on) > start1);
+    Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+    assertSkCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+
+    int eventDrainAttempt2 = random.nextInt(MAX_BOUNDS);
+    int eventDrainSuccess2 = random.nextInt(MAX_BOUNDS);
+
+    skc.addToEventDrainAttemptCount(eventDrainAttempt2);
+    skc.addToEventDrainSuccessCount(eventDrainSuccess2);
+
+    assertSkCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L,
+        eventDrainAttempt2, eventDrainSuccess2);
+  }
+
+  @Test
+  public void testChannelCounter() throws Exception {
+    String name = getRandomName();
+
+    ChannelCounter chc = new ChannelCounter(name);
+    ObjectName on = new ObjectName(CHANNEL_OBJ_NAME_PREFIX + name);
+    assertChCounterState(on, 0L, 0L, 0L, 0L, 0L);
+
+    Assert.assertTrue("StartTime", getStartTime(on) == 0L);
+    Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+    chc.start();
+
+    long start1 = getStartTime(on);
+
+    Assert.assertTrue("StartTime", start1 != 0L);
+    Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+    int numChannelSize = random.nextInt(MAX_BOUNDS);
+    int numEventPutAttempt = random.nextInt(MAX_BOUNDS);
+    int numEventTakeAttempt = random.nextInt(MAX_BOUNDS);
+    int numEventPutSuccess = random.nextInt(MAX_BOUNDS);
+    int numEventTakeSuccess = random.nextInt(MAX_BOUNDS);
+
+    chc.setChannelSize(numChannelSize);
+    for (int i = 0; i<numEventPutAttempt; i++) {
+      chc.incrementEventPutAttemptCount();
+    }
+    for (int i = 0; i<numEventTakeAttempt; i++) {
+      chc.incrementEventTakeAttemptCount();
+    }
+    chc.addToEventPutSuccessCount(numEventPutSuccess);
+    chc.addToEventTakeSuccessCount(numEventTakeSuccess);
+
+    assertChCounterState(on, numChannelSize, numEventPutAttempt,
+        numEventTakeAttempt, numEventPutSuccess, numEventTakeSuccess);
+
+    chc.stop();
+
+    Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+    Assert.assertTrue("StopTime", getStopTime(on) != 0L);
+
+    assertChCounterState(on, numChannelSize, numEventPutAttempt,
+        numEventTakeAttempt, numEventPutSuccess, numEventTakeSuccess);
+
+    chc.start();
+    Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+    Assert.assertTrue("StartTime", getStartTime(on) > start1);
+    Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+    assertChCounterState(on, 0L, 0L, 0L, 0L, 0L);
+  }
+
+  @Test
+  public void testSourceCounter() throws Exception {
+    String name = getRandomName();
+
+    SourceCounter srcc = new SourceCounter(name);
+    ObjectName on = new ObjectName(SOURCE_OBJ_NAME_PREFIX + name);
+
+    assertSrcCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L);
+
+    Assert.assertTrue("StartTime", getStartTime(on) == 0L);
+    Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+    srcc.start();
+
+    long start1 = getStartTime(on);
+
+    Assert.assertTrue("StartTime", start1 != 0L);
+    Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+    int numEventReceived = random.nextInt(MAX_BOUNDS);
+    int numEventAccepted = random.nextInt(MAX_BOUNDS);
+    int numAppendReceived = random.nextInt(MAX_BOUNDS);
+    int numAppendAccepted = random.nextInt(MAX_BOUNDS);
+    int numAppendBatchReceived = random.nextInt(MAX_BOUNDS);
+    int numAppendBatchAccepted = random.nextInt(MAX_BOUNDS);
+
+    srcc.addToEventReceivedCount(numEventReceived);
+    srcc.addToEventAcceptedCount(numEventAccepted);
+    for (int i = 0; i<numAppendReceived; i++) {
+      srcc.incrementAppendReceivedCount();
+    }
+    for (int i = 0; i<numAppendAccepted; i++) {
+      srcc.incrementAppendAcceptedCount();
+    }
+    for (int i = 0; i<numAppendBatchReceived; i++) {
+      srcc.incrementAppendBatchReceivedCount();
+    }
+    for (int i = 0; i<numAppendBatchAccepted; i++) {
+      srcc.incrementAppendBatchAcceptedCount();
+    }
+
+    assertSrcCounterState(on, numEventReceived, numEventAccepted,
+        numAppendReceived, numAppendAccepted, numAppendBatchReceived,
+        numAppendBatchAccepted);
+
+    srcc.stop();
+    Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+    Assert.assertTrue("StopTime", getStopTime(on) != 0L);
+
+    assertSrcCounterState(on, numEventReceived, numEventAccepted,
+        numAppendReceived, numAppendAccepted, numAppendBatchReceived,
+        numAppendBatchAccepted);
+
+    srcc.start();
+    Assert.assertTrue("StartTime", getStartTime(on) != 0L);
+    Assert.assertTrue("StartTime", getStartTime(on) > start1);
+    Assert.assertTrue("StopTime", getStopTime(on) == 0L);
+
+    assertSrcCounterState(on, 0L, 0L, 0L, 0L, 0L, 0L);
+
+    int numEventReceived2 = random.nextInt(MAX_BOUNDS);
+    int numEventAccepted2 = random.nextInt(MAX_BOUNDS);
+
+    for (int i = 0; i<numEventReceived2; i++) {
+      srcc.incrementEventReceivedCount();
+    }
+
+    for (int i = 0; i<numEventAccepted2; i++) {
+      srcc.incrementEventAcceptedCount();
+    }
+
+    assertSrcCounterState(on, numEventReceived2, numEventAccepted2,
+        0L, 0L, 0L, 0L);
+  }
+
+  private void assertSrcCounterState(ObjectName on, long eventReceivedCount,
+      long eventAcceptedCount, long appendReceivedCount,
+      long appendAcceptedCount, long appendBatchReceivedCount,
+      long appendBatchAcceptedCount) throws Exception {
+    Assert.assertEquals("SrcEventReceived",
+        getSrcEventReceivedCount(on),
+        eventReceivedCount);
+    Assert.assertEquals("SrcEventAccepted",
+        getSrcEventAcceptedCount(on),
+        eventAcceptedCount);
+    Assert.assertEquals("SrcAppendReceived",
+        getSrcAppendReceivedCount(on),
+        appendReceivedCount);
+    Assert.assertEquals("SrcAppendAccepted",
+        getSrcAppendAcceptedCount(on),
+        appendAcceptedCount);
+    Assert.assertEquals("SrcAppendBatchReceived",
+        getSrcAppendBatchReceivedCount(on),
+        appendBatchReceivedCount);
+    Assert.assertEquals("SrcAppendBatchAccepted",
+        getSrcAppendBatchAcceptedCount(on),
+        appendBatchAcceptedCount);
+  }
+
+  private void assertChCounterState(ObjectName on, long channelSize,
+      long eventPutAttempt, long eventTakeAttempt, long eventPutSuccess,
+      long eventTakeSuccess) throws Exception {
+    Assert.assertEquals("ChChannelSize",
+        getChChannelSize(on),
+        channelSize);
+    Assert.assertEquals("ChEventPutAttempt",
+        getChEventPutAttempt(on),
+        eventPutAttempt);
+    Assert.assertEquals("ChEventTakeAttempt",
+        getChEventTakeAttempt(on),
+        eventTakeAttempt);
+    Assert.assertEquals("ChEventPutSuccess",
+        getChEventPutSuccess(on),
+        eventPutSuccess);
+    Assert.assertEquals("ChEventTakeSuccess",
+        getChEventTakeSuccess(on),
+        eventTakeSuccess);
+  }
+
+  private void assertSkCounterState(ObjectName on, long connCreated,
+      long connClosed, long connFailed, long batchEmpty, long batchUnderflow,
+      long batchComplete, long eventDrainAttempt, long eventDrainSuccess)
+        throws Exception {
+    Assert.assertEquals("SkConnCreated",
+        getSkConnectionCreated(on),
+        connCreated);
+    Assert.assertEquals("SkConnClosed",
+        getSkConnectionClosed(on),
+        connClosed);
+    Assert.assertEquals("SkConnFailed",
+        getSkConnectionFailed(on),
+        connFailed);
+    Assert.assertEquals("SkBatchEmpty",
+        getSkBatchEmpty(on),
+        batchEmpty);
+    Assert.assertEquals("SkBatchUnderflow",
+        getSkBatchUnderflow(on),
+        batchUnderflow);
+    Assert.assertEquals("SkBatchComplete",
+        getSkBatchComplete(on),
+        batchComplete);
+    Assert.assertEquals("SkEventDrainAttempt",
+        getSkEventDrainAttempt(on),
+        eventDrainAttempt);
+    Assert.assertEquals("SkEventDrainSuccess",
+        getSkEventDrainSuccess(on),
+        eventDrainSuccess);
+  }
+
+  private long getStartTime(ObjectName on) throws Exception {
+    return getLongAttribute(on, ATTR_START_TIME);
+  }
+
+  private long getStopTime(ObjectName on) throws Exception {
+    return getLongAttribute(on, ATTR_STOP_TIME);
+  }
+
+  private long getSkConnectionCreated(ObjectName on) throws Exception {
+    return getLongAttribute(on, SK_ATTR_CONN_CREATED);
+  }
+
+  private long getSkConnectionClosed(ObjectName on) throws Exception {
+    return getLongAttribute(on, SK_ATTR_CONN_CLOSED);
+  }
+
+  private long getSkConnectionFailed(ObjectName on) throws Exception {
+    return getLongAttribute(on, SK_ATTR_CONN_FAILED);
+  }
+
+  private long getSkBatchEmpty(ObjectName on) throws Exception {
+    return getLongAttribute(on, SK_ATTR_BATCH_EMPTY);
+  }
+
+  private long getSkBatchUnderflow(ObjectName on) throws Exception {
+    return getLongAttribute(on, SK_ATTR_BATCH_UNDERFLOW);
+  }
+
+  private long getSkBatchComplete(ObjectName on) throws Exception {
+    return getLongAttribute(on, SK_ATTR_BATCH_COMPLETE);
+  }
+
+  private long getSkEventDrainAttempt(ObjectName on) throws Exception {
+    return getLongAttribute(on, SK_ATTR_EVENT_DRAIN_ATTEMPT);
+  }
+
+  private long getSkEventDrainSuccess(ObjectName on) throws Exception {
+    return getLongAttribute(on, SK_ATTR_EVENT_DRAIN_SUCCESS);
+  }
+
+  private long getChChannelSize(ObjectName on) throws Exception {
+    return getLongAttribute(on, CH_ATTR_CHANNEL_SIZE);
+  }
+
+  private long getChEventPutAttempt(ObjectName on) throws Exception {
+    return getLongAttribute(on, CH_ATTR_EVENT_PUT_ATTEMPT);
+  }
+
+  private long getChEventTakeAttempt(ObjectName on) throws Exception {
+    return getLongAttribute(on, CH_ATTR_EVENT_TAKE_ATTEMPT);
+  }
+
+  private long getChEventPutSuccess(ObjectName on) throws Exception {
+    return getLongAttribute(on, CH_ATTR_EVENT_PUT_SUCCESS);
+  }
+
+  private long getChEventTakeSuccess(ObjectName on) throws Exception {
+    return getLongAttribute(on, CH_ATTR_EVENT_TAKE_SUCCESS);
+  }
+
+  private long getSrcAppendBatchAcceptedCount(ObjectName on) throws Exception {
+    return getLongAttribute(on, SRC_ATTR_APPEND_BATCH_ACCEPTED_COUNT);
+  }
+
+  private long getSrcAppendBatchReceivedCount(ObjectName on) throws Exception {
+    return getLongAttribute(on, SRC_ATTR_APPEND_BATCH_RECEVIED_COUNT);
+  }
+
+  private long getSrcAppendAcceptedCount(ObjectName on) throws Exception {
+    return getLongAttribute(on, SRC_ATTR_APPEND_ACCEPTED_COUNT);
+  }
+
+  private long getSrcAppendReceivedCount(ObjectName on) throws Exception {
+    return getLongAttribute(on, SRC_ATTR_APPEND_RECEVIED_COUNT);
+  }
+
+  private long getSrcEventAcceptedCount(ObjectName on) throws Exception {
+    return getLongAttribute(on, SRC_ATTR_EVENT_ACCEPTED_COUNT);
+  }
+
+  private long getSrcEventReceivedCount(ObjectName on) throws Exception {
+    return getLongAttribute(on, SRC_ATTR_EVENT_RECEVIED_COUNT);
+  }
+
+  private long getLongAttribute(ObjectName on, String attr) throws Exception {
+    Object result = getAttribute(on, attr);
+    return ((Long) result).longValue();
+  }
+
+  private Object getAttribute(ObjectName objName, String attrName)
+      throws Exception {
+    return mbServer.getAttribute(objName, attrName);
+  }
+
+  private String getRandomName() {
+    return "random-" + System.nanoTime();
+  }
+
+}

Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Fri Jul  6 22:30:11 2012
@@ -26,8 +26,10 @@ import java.util.concurrent.ScheduledExe
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -78,11 +80,13 @@ class BucketWriter {
   private volatile long batchCounter;
   private volatile boolean isOpen;
   private volatile ScheduledFuture<Void> timedRollFuture;
+  private SinkCounter sinkCounter;
 
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
       Context context, String filePath, CompressionCodec codeC,
       CompressionType compType, HDFSWriter writer, FlumeFormatter formatter,
-      ScheduledExecutorService timedRollerPool, UserGroupInformation user) {
+      ScheduledExecutorService timedRollerPool, UserGroupInformation user,
+      SinkCounter sinkCounter) {
     this.rollInterval = rollInterval;
     this.rollSize = rollSize;
     this.rollCount = rollCount;
@@ -95,6 +99,7 @@ class BucketWriter {
     this.formatter = formatter;
     this.timedRollerPool = timedRollerPool;
     this.user = user;
+    this.sinkCounter = sinkCounter;
 
     fileExtensionCounter = new AtomicLong(System.currentTimeMillis());
 
@@ -172,24 +177,33 @@ class BucketWriter {
     // NOTE: tried synchronizing on the underlying Kerberos principal previously
     // which caused deadlocks. See FLUME-1231.
     synchronized (staticLock) {
-      long counter = fileExtensionCounter.incrementAndGet();
-      if (codeC == null) {
-        bucketPath = filePath + "." + counter;
-        // Need to get reference to FS using above config before underlying
-        // writer does in order to avoid shutdown hook & IllegalStateExceptions
-        fileSystem = new Path(bucketPath).getFileSystem(config);
-        LOG.info("Creating " + bucketPath + IN_USE_EXT);
-        writer.open(bucketPath + IN_USE_EXT, formatter);
-      } else {
-        bucketPath = filePath + "." + counter
-            + codeC.getDefaultExtension();
-        // need to get reference to FS before writer does to avoid shutdown hook
-        fileSystem = new Path(bucketPath).getFileSystem(config);
-        LOG.info("Creating " + bucketPath + IN_USE_EXT);
-        writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
+      try {
+        long counter = fileExtensionCounter.incrementAndGet();
+        if (codeC == null) {
+          bucketPath = filePath + "." + counter;
+          // Need to get reference to FS using above config before underlying
+          // writer does in order to avoid shutdown hook & IllegalStateExceptions
+          fileSystem = new Path(bucketPath).getFileSystem(config);
+          LOG.info("Creating " + bucketPath + IN_USE_EXT);
+          writer.open(bucketPath + IN_USE_EXT, formatter);
+        } else {
+          bucketPath = filePath + "." + counter
+              + codeC.getDefaultExtension();
+          // need to get reference to FS before writer does to avoid shutdown hook
+          fileSystem = new Path(bucketPath).getFileSystem(config);
+          LOG.info("Creating " + bucketPath + IN_USE_EXT);
+          writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
+        }
+      } catch (Exception ex) {
+        sinkCounter.incrementConnectionFailedCount();
+        if (ex instanceof IOException) {
+          throw (IOException) ex;
+        } else {
+          throw new IOException(ex);
+        }
       }
     }
-
+    sinkCounter.incrementConnectionCreatedCount();
     resetCounters();
 
     // if time-based rolling is enabled, schedule the roll
@@ -234,9 +248,11 @@ class BucketWriter {
     if (isOpen) {
       try {
         writer.close(); // could block
+        sinkCounter.incrementConnectionClosedCount();
       } catch (IOException e) {
         LOG.warn("failed to close() HDFSWriter for file (" + bucketPath +
             IN_USE_EXT + "). Exception follows.", e);
+        sinkCounter.incrementConnectionFailedCount();
       }
       isOpen = false;
     } else {
@@ -299,6 +315,7 @@ class BucketWriter {
 
     // write the event
     try {
+      sinkCounter.incrementEventDrainAttemptCount();
       writer.append(event, formatter); // could block
     } catch (IOException e) {
       LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +

Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Fri Jul  6 22:30:11 2012
@@ -20,7 +20,6 @@ package org.apache.flume.sink.hdfs;
 
 import java.io.File;
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.LinkedHashMap;
@@ -32,6 +31,7 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -43,6 +43,7 @@ import org.apache.flume.EventDeliveryExc
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
@@ -58,7 +59,6 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.util.concurrent.ScheduledExecutorService;
 
 public class HDFSEventSink extends AbstractSink implements Configurable {
   private static final Logger LOG = LoggerFactory
@@ -124,6 +124,7 @@ public class HDFSEventSink extends Abstr
 
   private long callTimeout;
   private Context context;
+  private SinkCounter sinkCounter;
 
   /*
    * Extended Java LinkedHashMap for open file handle LRU queue.
@@ -263,6 +264,10 @@ public class HDFSEventSink extends Abstr
             "must be > 0 and <= 24");
       }
     }
+
+    if (sinkCounter == null) {
+      sinkCounter = new SinkCounter(getName());
+    }
   }
 
   private static boolean codecMatches(Class<? extends CompressionCodec> cls,
@@ -334,8 +339,10 @@ public class HDFSEventSink extends Abstr
       }
     } catch (TimeoutException eT) {
       future.cancel(true);
+      sinkCounter.incrementConnectionFailedCount();
       throw new IOException("Callable timed out", eT);
     } catch (ExecutionException e1) {
+      sinkCounter.incrementConnectionFailedCount();
       Throwable cause = e1.getCause();
       if (cause instanceof IOException) {
         throw (IOException) cause;
@@ -371,7 +378,8 @@ public class HDFSEventSink extends Abstr
     transaction.begin();
     try {
       Event event = null;
-      for (int txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
+      int txnEventCount = 0;
+      for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
         event = channel.take();
         if (event == null) {
           break;
@@ -390,7 +398,7 @@ public class HDFSEventSink extends Abstr
 
           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
               batchSize, context, realPath, codeC, compType, hdfsWriter,
-              formatter, timedRollerPool, proxyTicket);
+              formatter, timedRollerPool, proxyTicket, sinkCounter);
 
           sfWriters.put(realPath, bucketWriter);
         }
@@ -404,6 +412,14 @@ public class HDFSEventSink extends Abstr
         append(bucketWriter, event);
       }
 
+      if (txnEventCount == 0) {
+        sinkCounter.incrementBatchEmptyCount();
+      } else if (txnEventCount == txnEventMax) {
+        sinkCounter.incrementBatchCompleteCount();
+      } else {
+        sinkCounter.incrementBatchUnderflowCount();
+      }
+
       // flush all pending buckets before committing the transaction
       for (BucketWriter bucketWriter : writers) {
         if (!bucketWriter.isBatchComplete()) {
@@ -412,6 +428,11 @@ public class HDFSEventSink extends Abstr
       }
 
       transaction.commit();
+
+      if (txnEventCount > 0) {
+        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+      }
+
       if(event == null) {
         return Status.BACKOFF;
       }
@@ -470,7 +491,7 @@ public class HDFSEventSink extends Abstr
 
     sfWriters.clear();
     sfWriters = null;
-
+    sinkCounter.stop();
     super.stop();
   }
 
@@ -485,6 +506,7 @@ public class HDFSEventSink extends Abstr
         new ThreadFactoryBuilder().setNameFormat(rollerName).build());
 
     this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);
+    sinkCounter.start();
     super.start();
   }
 
@@ -512,7 +534,7 @@ public class HDFSEventSink extends Abstr
         //HDFSEventSink will halt when keytab file is non-exist or unreadable
         File kfile = new File(kerbKeytab);
         if (!(kfile.isFile() && kfile.canRead())) {
-          throw new IllegalArgumentException("The keyTab file: " 
+          throw new IllegalArgumentException("The keyTab file: "
               + kerbKeytab + " is nonexistent or can't read. "
               + "Please specify a readable keytab file for Kerberos auth.");
         }

Modified: incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java?rev=1358458&r1=1358457&r2=1358458&view=diff
==============================================================================
--- incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java (original)
+++ incubator/flume/branches/branch-1.2.0/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java Fri Jul  6 22:30:11 2012
@@ -18,14 +18,15 @@
  */
 package org.apache.flume.sink.hdfs;
 
-import com.google.common.base.Charsets;
 import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.hadoop.io.SequenceFile;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -34,6 +35,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
+
 public class TestBucketWriter {
 
   private static Logger logger =
@@ -61,7 +64,8 @@ public class TestBucketWriter {
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
         "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter, timedRollerPool, null);
+        formatter, timedRollerPool, null,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -84,7 +88,8 @@ public class TestBucketWriter {
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
         "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter, timedRollerPool, null);
+        formatter, timedRollerPool, null,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     for (int i = 0; i < 1000; i++) {
@@ -109,7 +114,8 @@ public class TestBucketWriter {
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter, timedRollerPool, null);
+        formatter, timedRollerPool, null,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()));
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     long startNanos = System.nanoTime();