You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/06/21 21:14:01 UTC
git commit: FLUME-2081. JMX metrics support for SpoolDir.
Updated Branches:
refs/heads/trunk b455486c2 -> 862c83187
FLUME-2081. JMX metrics support for SpoolDir.
(Sravya Tirukkovalur via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/862c8318
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/862c8318
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/862c8318
Branch: refs/heads/trunk
Commit: 862c83187a1ea443142b49df8e86671e55d927d6
Parents: b455486
Author: Mike Percy <mp...@apache.org>
Authored: Fri Jun 21 12:12:40 2013 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Jun 21 12:12:40 2013 -0700
----------------------------------------------------------------------
.../flume/source/SpoolDirectorySource.java | 25 +++++++++++++-------
1 file changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/862c8318/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 641b5c6..7145580 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.flume.*;
import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.serialization.LineDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ Configurable, EventDrivenSource {
private String deletePolicy;
private String inputCharset;
- private CounterGroup counterGroup;
+ private SourceCounter sourceCounter;
ReliableSpoolingFileEventReader reader;
@Override
@@ -68,7 +69,6 @@ Configurable, EventDrivenSource {
ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
- counterGroup = new CounterGroup();
File directory = new File(spoolDirectory);
try {
@@ -89,17 +89,21 @@ Configurable, EventDrivenSource {
ioe);
}
- Runnable runner = new SpoolDirectoryRunnable(reader, counterGroup);
+ Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
executor.scheduleWithFixedDelay(
runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);
super.start();
logger.debug("SpoolDirectorySource source started");
+ sourceCounter.start();
}
@Override
public void stop() {
super.stop();
+ sourceCounter.stop();
+ logger.info("SpoolDir source {} stopped. Metrics: {}", getName(),
+ sourceCounter);
}
@Override
@@ -134,17 +138,19 @@ Configurable, EventDrivenSource {
deserializerContext.put(LineDeserializer.MAXLINE_KEY,
bufferMaxLineLength.toString());
}
-
+ if (sourceCounter == null) {
+ sourceCounter = new SourceCounter(getName());
+ }
}
private class SpoolDirectoryRunnable implements Runnable {
private ReliableSpoolingFileEventReader reader;
- private CounterGroup counterGroup;
+ private SourceCounter sourceCounter;
public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader,
- CounterGroup counterGroup) {
+ SourceCounter sourceCounter) {
this.reader = reader;
- this.counterGroup = counterGroup;
+ this.sourceCounter = sourceCounter;
}
@Override
@@ -155,10 +161,13 @@ Configurable, EventDrivenSource {
if (events.isEmpty()) {
break;
}
- counterGroup.addAndGet("spooler.events.read", (long) events.size());
+ sourceCounter.addToEventReceivedCount(events.size());
+ sourceCounter.incrementAppendBatchReceivedCount();
getChannelProcessor().processEventBatch(events);
reader.commit();
+ sourceCounter.addToEventAcceptedCount(events.size());
+ sourceCounter.incrementAppendBatchAcceptedCount();
}
} catch (Throwable t) {
logger.error("Uncaught exception in Runnable", t);