You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/04/18 09:46:20 UTC

git commit: FLUME-2004. Capture JMX metrics for Exec source.

Updated Branches:
  refs/heads/trunk 1074fd05b -> 41ca44be5


FLUME-2004. Capture JMX metrics for Exec source.

(Venkatesh Sivasubramanian via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/41ca44be
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/41ca44be
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/41ca44be

Branch: refs/heads/trunk
Commit: 41ca44be52e65845b359307f637282d794a345e4
Parents: 1074fd0
Author: Mike Percy <mp...@apache.org>
Authored: Thu Apr 18 00:40:46 2013 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Apr 18 00:40:46 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/flume/source/ExecSource.java   |   25 ++++--
 .../org/apache/flume/source/TestExecSource.java    |   58 +++++++++++++++
 2 files changed, 74 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/41ca44be/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index 8e687f2..3c9437d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -31,13 +31,13 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDrivenSource;
 import org.apache.flume.Source;
 import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -142,7 +142,7 @@ Configurable {
 
   private String shell;
   private String command;
-  private CounterGroup counterGroup;
+  private SourceCounter sourceCounter;
   private ExecutorService executor;
   private Future<?> runnerFuture;
   private long restartThrottle;
@@ -157,9 +157,8 @@ Configurable {
     logger.info("Exec source starting with command:{}", command);
 
     executor = Executors.newSingleThreadExecutor();
-    counterGroup = new CounterGroup();
 
-    runner = new ExecRunnable(shell, command, getChannelProcessor(), counterGroup,
+    runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
         restart, restartThrottle, logStderr, bufferCount, charset);
 
     // FIXME: Use a callback-like executor / future to signal us upon failure.
@@ -170,6 +169,7 @@ Configurable {
      * it sets our state to running. We want to make sure the executor is alive
      * and well first.
      */
+    sourceCounter.start();
     super.start();
 
     logger.debug("Exec source started");
@@ -202,10 +202,11 @@ Configurable {
       }
     }
 
+    sourceCounter.stop();
     super.stop();
 
     logger.debug("Exec source with command:{} stopped. Metrics:{}", command,
-        counterGroup);
+        sourceCounter);
   }
 
   @Override
@@ -231,16 +232,20 @@ Configurable {
         ExecSourceConfigurationConstants.DEFAULT_CHARSET));
 
     shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);
+
+    if (sourceCounter == null) {
+      sourceCounter = new SourceCounter(getName());
+    }
   }
 
   private static class ExecRunnable implements Runnable {
 
     public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor,
-        CounterGroup counterGroup, boolean restart, long restartThrottle,
+        SourceCounter sourceCounter, boolean restart, long restartThrottle,
         boolean logStderr, int bufferCount, Charset charset) {
       this.command = command;
       this.channelProcessor = channelProcessor;
-      this.counterGroup = counterGroup;
+      this.sourceCounter = sourceCounter;
       this.restartThrottle = restartThrottle;
       this.bufferCount = bufferCount;
       this.restart = restart;
@@ -252,7 +257,7 @@ Configurable {
     private final String shell;
     private final String command;
     private final ChannelProcessor channelProcessor;
-    private final CounterGroup counterGroup;
+    private final SourceCounter sourceCounter;
     private volatile boolean restart;
     private final long restartThrottle;
     private final int bufferCount;
@@ -286,15 +291,17 @@ Configurable {
           String line = null;
           List<Event> eventList = new ArrayList<Event>();
           while ((line = reader.readLine()) != null) {
-            counterGroup.incrementAndGet("exec.lines.read");
+            sourceCounter.incrementEventReceivedCount();
             eventList.add(EventBuilder.withBody(line.getBytes(charset)));
             if(eventList.size() >= bufferCount) {
               channelProcessor.processEventBatch(eventList);
+              sourceCounter.addToEventAcceptedCount(eventList.size());
               eventList.clear();
             }
           }
           if(!eventList.isEmpty()) {
             channelProcessor.processEventBatch(eventList);
+            sourceCounter.addToEventAcceptedCount(eventList.size());
           }
         } catch (Exception e) {
           logger.error("Failed while running command: " + command, e);

http://git-wip-us.apache.org/repos/asf/flume/blob/41ca44be/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
index 3d524f0..77e9a44 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
@@ -23,10 +23,16 @@ package org.apache.flume.source;
 import static org.junit.Assert.*;
 
 import java.io.*;
+import java.lang.management.ManagementFactory;
 import java.nio.charset.Charset;
 import java.util.*;
 import java.util.regex.Pattern;
 
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
@@ -70,6 +76,18 @@ public class TestExecSource {
   @After
   public void tearDown() {
     source.stop();
+
+    // Remove the MBean registered for Monitoring
+    ObjectName objName = null;
+    try {
+        objName = new ObjectName("org.apache.flume.source"
+          + ":type=" + source.getName());
+
+        ManagementFactory.getPlatformMBeanServer().unregisterMBean(objName);
+    } catch (Exception ex) {
+      System.out.println("Failed to unregister the monitored counter: "
+          + objName + ex.getMessage());
+    }
   }
 
   @Test
@@ -168,6 +186,46 @@ public class TestExecSource {
       }
     }
 
+  @Test
+  public void testMonitoredCounterGroup() throws InterruptedException, LifecycleException,
+  EventDeliveryException, IOException {
+    // mini script
+    runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done"
+            , new String[]{"1","2","3","4","5" } );
+
+    ObjectName objName = null;
+
+    try {
+        objName = new ObjectName("org.apache.flume.source"
+          + ":type=" + source.getName());
+
+        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+        String strAtts[] = {"Type", "EventReceivedCount", "EventAcceptedCount"};
+        AttributeList attrList = mbeanServer.getAttributes(objName, strAtts);
+
+        Assert.assertNotNull(attrList.get(0));
+        Assert.assertEquals("Expected Value: Type", "Type",
+                ((Attribute) attrList.get(0)).getName());
+        Assert.assertEquals("Expected Value: SOURCE", "SOURCE",
+                ((Attribute) attrList.get(0)).getValue());
+
+        Assert.assertNotNull(attrList.get(1));
+        Assert.assertEquals("Expected Value: EventReceivedCount", "EventReceivedCount",
+                ((Attribute) attrList.get(1)).getName());
+        Assert.assertEquals("Expected Value: 5", "5",
+                ((Attribute) attrList.get(1)).getValue().toString());
+
+        Assert.assertNotNull(attrList.get(2));
+        Assert.assertEquals("Expected Value: EventAcceptedCount", "EventAcceptedCount",
+                ((Attribute) attrList.get(2)).getName());
+        Assert.assertEquals("Expected Value: 5", "5",
+                ((Attribute) attrList.get(2)).getValue().toString());
+
+    } catch (Exception ex) {
+      System.out.println("Unable to retreive the monitored counter: "
+          + objName + ex.getMessage());
+    }
+  }
 
     private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput)
              throws InterruptedException, LifecycleException, EventDeliveryException, IOException {