You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/06/21 21:14:01 UTC

git commit: FLUME-2081. JMX metrics support for SpoolDir.

Updated Branches:
  refs/heads/trunk b455486c2 -> 862c83187


FLUME-2081. JMX metrics support for SpoolDir.

(Sravya Tirukkovalur via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/862c8318
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/862c8318
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/862c8318

Branch: refs/heads/trunk
Commit: 862c83187a1ea443142b49df8e86671e55d927d6
Parents: b455486
Author: Mike Percy <mp...@apache.org>
Authored: Fri Jun 21 12:12:40 2013 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jun 21 12:12:40 2013 -0700

----------------------------------------------------------------------
 .../flume/source/SpoolDirectorySource.java      | 25 +++++++++++++-------
 1 file changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/862c8318/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 641b5c6..7145580 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flume.*;
 import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.apache.flume.serialization.LineDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ Configurable, EventDrivenSource {
   private String deletePolicy;
   private String inputCharset;
 
-  private CounterGroup counterGroup;
+  private SourceCounter sourceCounter;
   ReliableSpoolingFileEventReader reader;
 
   @Override
@@ -68,7 +69,6 @@ Configurable, EventDrivenSource {
 
     ScheduledExecutorService executor =
         Executors.newSingleThreadScheduledExecutor();
-    counterGroup = new CounterGroup();
 
     File directory = new File(spoolDirectory);
     try {
@@ -89,17 +89,21 @@ Configurable, EventDrivenSource {
           ioe);
     }
 
-    Runnable runner = new SpoolDirectoryRunnable(reader, counterGroup);
+    Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
     executor.scheduleWithFixedDelay(
         runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
 
     super.start();
     logger.debug("SpoolDirectorySource source started");
+    sourceCounter.start();
   }
 
   @Override
   public void stop() {
     super.stop();
+    sourceCounter.stop();
+    logger.info("SpoolDir source {} stopped. Metrics: {}", getName(),
+      sourceCounter);
   }
 
   @Override
@@ -134,17 +138,19 @@ Configurable, EventDrivenSource {
       deserializerContext.put(LineDeserializer.MAXLINE_KEY,
           bufferMaxLineLength.toString());
     }
-
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
   }
 
   private class SpoolDirectoryRunnable implements Runnable {
     private ReliableSpoolingFileEventReader reader;
-    private CounterGroup counterGroup;
+    private SourceCounter sourceCounter;
 
     public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader,
-        CounterGroup counterGroup) {
+        SourceCounter sourceCounter) {
       this.reader = reader;
-      this.counterGroup = counterGroup;
+      this.sourceCounter = sourceCounter;
     }
 
     @Override
@@ -155,10 +161,13 @@ Configurable, EventDrivenSource {
           if (events.isEmpty()) {
             break;
           }
-          counterGroup.addAndGet("spooler.events.read", (long) events.size());
+          sourceCounter.addToEventReceivedCount(events.size());
+          sourceCounter.incrementAppendBatchReceivedCount();
 
           getChannelProcessor().processEventBatch(events);
           reader.commit();
+          sourceCounter.addToEventAcceptedCount(events.size());
+          sourceCounter.incrementAppendBatchAcceptedCount();
         }
       } catch (Throwable t) {
         logger.error("Uncaught exception in Runnable", t);