You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/04/18 09:46:20 UTC
git commit: FLUME-2004. Capture JMX metrics for Exec source.
Updated Branches:
refs/heads/trunk 1074fd05b -> 41ca44be5
FLUME-2004. Capture JMX metrics for Exec source.
(Venkatesh Sivasubramanian via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/41ca44be
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/41ca44be
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/41ca44be
Branch: refs/heads/trunk
Commit: 41ca44be52e65845b359307f637282d794a345e4
Parents: 1074fd0
Author: Mike Percy <mp...@apache.org>
Authored: Thu Apr 18 00:40:46 2013 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Apr 18 00:40:46 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/flume/source/ExecSource.java | 25 ++++--
.../org/apache/flume/source/TestExecSource.java | 58 +++++++++++++++
2 files changed, 74 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/41ca44be/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
index 8e687f2..3c9437d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
@@ -31,13 +31,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.flume.Channel;
import org.apache.flume.Context;
-import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.Source;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -142,7 +142,7 @@ Configurable {
private String shell;
private String command;
- private CounterGroup counterGroup;
+ private SourceCounter sourceCounter;
private ExecutorService executor;
private Future<?> runnerFuture;
private long restartThrottle;
@@ -157,9 +157,8 @@ Configurable {
logger.info("Exec source starting with command:{}", command);
executor = Executors.newSingleThreadExecutor();
- counterGroup = new CounterGroup();
- runner = new ExecRunnable(shell, command, getChannelProcessor(), counterGroup,
+ runner = new ExecRunnable(shell, command, getChannelProcessor(), sourceCounter,
restart, restartThrottle, logStderr, bufferCount, charset);
// FIXME: Use a callback-like executor / future to signal us upon failure.
@@ -170,6 +169,7 @@ Configurable {
* it sets our state to running. We want to make sure the executor is alive
* and well first.
*/
+ sourceCounter.start();
super.start();
logger.debug("Exec source started");
@@ -202,10 +202,11 @@ Configurable {
}
}
+ sourceCounter.stop();
super.stop();
logger.debug("Exec source with command:{} stopped. Metrics:{}", command,
- counterGroup);
+ sourceCounter);
}
@Override
@@ -231,16 +232,20 @@ Configurable {
ExecSourceConfigurationConstants.DEFAULT_CHARSET));
shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);
+
+ if (sourceCounter == null) {
+ sourceCounter = new SourceCounter(getName());
+ }
}
private static class ExecRunnable implements Runnable {
public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor,
- CounterGroup counterGroup, boolean restart, long restartThrottle,
+ SourceCounter sourceCounter, boolean restart, long restartThrottle,
boolean logStderr, int bufferCount, Charset charset) {
this.command = command;
this.channelProcessor = channelProcessor;
- this.counterGroup = counterGroup;
+ this.sourceCounter = sourceCounter;
this.restartThrottle = restartThrottle;
this.bufferCount = bufferCount;
this.restart = restart;
@@ -252,7 +257,7 @@ Configurable {
private final String shell;
private final String command;
private final ChannelProcessor channelProcessor;
- private final CounterGroup counterGroup;
+ private final SourceCounter sourceCounter;
private volatile boolean restart;
private final long restartThrottle;
private final int bufferCount;
@@ -286,15 +291,17 @@ Configurable {
String line = null;
List<Event> eventList = new ArrayList<Event>();
while ((line = reader.readLine()) != null) {
- counterGroup.incrementAndGet("exec.lines.read");
+ sourceCounter.incrementEventReceivedCount();
eventList.add(EventBuilder.withBody(line.getBytes(charset)));
if(eventList.size() >= bufferCount) {
channelProcessor.processEventBatch(eventList);
+ sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
}
}
if(!eventList.isEmpty()) {
channelProcessor.processEventBatch(eventList);
+ sourceCounter.addToEventAcceptedCount(eventList.size());
}
} catch (Exception e) {
logger.error("Failed while running command: " + command, e);
http://git-wip-us.apache.org/repos/asf/flume/blob/41ca44be/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
index 3d524f0..77e9a44 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestExecSource.java
@@ -23,10 +23,16 @@ package org.apache.flume.source;
import static org.junit.Assert.*;
import java.io.*;
+import java.lang.management.ManagementFactory;
import java.nio.charset.Charset;
import java.util.*;
import java.util.regex.Pattern;
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
import org.apache.commons.io.FileUtils;
import org.apache.flume.Channel;
import org.apache.flume.ChannelSelector;
@@ -70,6 +76,18 @@ public class TestExecSource {
@After
public void tearDown() {
source.stop();
+
+ // Remove the MBean registered for Monitoring
+ ObjectName objName = null;
+ try {
+ objName = new ObjectName("org.apache.flume.source"
+ + ":type=" + source.getName());
+
+ ManagementFactory.getPlatformMBeanServer().unregisterMBean(objName);
+ } catch (Exception ex) {
+ System.out.println("Failed to unregister the monitored counter: "
+ + objName + ex.getMessage());
+ }
}
@Test
@@ -168,6 +186,46 @@ public class TestExecSource {
}
}
+ @Test
+ public void testMonitoredCounterGroup() throws InterruptedException, LifecycleException,
+ EventDeliveryException, IOException {
+ // mini script
+ runTestShellCmdHelper("/bin/bash -c", "for i in {1..5}; do echo $i;done"
+ , new String[]{"1","2","3","4","5" } );
+
+ ObjectName objName = null;
+
+ try {
+ objName = new ObjectName("org.apache.flume.source"
+ + ":type=" + source.getName());
+
+ MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
+ String strAtts[] = {"Type", "EventReceivedCount", "EventAcceptedCount"};
+ AttributeList attrList = mbeanServer.getAttributes(objName, strAtts);
+
+ Assert.assertNotNull(attrList.get(0));
+ Assert.assertEquals("Expected Value: Type", "Type",
+ ((Attribute) attrList.get(0)).getName());
+ Assert.assertEquals("Expected Value: SOURCE", "SOURCE",
+ ((Attribute) attrList.get(0)).getValue());
+
+ Assert.assertNotNull(attrList.get(1));
+ Assert.assertEquals("Expected Value: EventReceivedCount", "EventReceivedCount",
+ ((Attribute) attrList.get(1)).getName());
+ Assert.assertEquals("Expected Value: 5", "5",
+ ((Attribute) attrList.get(1)).getValue().toString());
+
+ Assert.assertNotNull(attrList.get(2));
+ Assert.assertEquals("Expected Value: EventAcceptedCount", "EventAcceptedCount",
+ ((Attribute) attrList.get(2)).getName());
+ Assert.assertEquals("Expected Value: 5", "5",
+ ((Attribute) attrList.get(2)).getValue().toString());
+
+ } catch (Exception ex) {
+ System.out.println("Unable to retreive the monitored counter: "
+ + objName + ex.getMessage());
+ }
+ }
private void runTestShellCmdHelper(String shell, String command, String[] expectedOutput)
throws InterruptedException, LifecycleException, EventDeliveryException, IOException {