You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/10/16 23:25:09 UTC
git commit: FLUME-2213. MorphlineInterceptor should share metric
registry across threads for better (aggregate) reporting
Updated Branches:
refs/heads/trunk 68fe4d451 -> c420fad5d
FLUME-2213. MorphlineInterceptor should share metric registry across threads for better (aggregate) reporting
(Wolfgang Hoschek via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c420fad5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c420fad5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c420fad5
Branch: refs/heads/trunk
Commit: c420fad5d03dc8d17dce7fe3e59bf3b742f3d22d
Parents: 68fe4d4
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Oct 16 14:24:13 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Oct 16 14:24:13 2013 -0700
----------------------------------------------------------------------
.../solr/morphline/MorphlineHandlerImpl.java | 67 ++++++++++++++------
1 file changed, 47 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/c420fad5/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
index ea76322..cb88dc2 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
@@ -31,8 +31,12 @@ import com.cloudera.cdk.morphline.api.Record;
import com.cloudera.cdk.morphline.base.Compiler;
import com.cloudera.cdk.morphline.base.FaultTolerance;
import com.cloudera.cdk.morphline.base.Fields;
+import com.cloudera.cdk.morphline.base.Metrics;
import com.cloudera.cdk.morphline.base.Notifications;
+import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SharedMetricRegistries;
+import com.codahale.metrics.Timer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
@@ -46,6 +50,11 @@ public class MorphlineHandlerImpl implements MorphlineHandler {
private Command finalChild;
private String morphlineFileAndId;
+ private Timer mappingTimer;
+ private Meter numRecords;
+ private Meter numFailedRecords;
+ private Meter numExceptionRecords;
+
public static final String MORPHLINE_FILE_PARAM = "morphlineFile";
public static final String MORPHLINE_ID_PARAM = "morphlineId";
@@ -69,6 +78,13 @@ public class MorphlineHandlerImpl implements MorphlineHandler {
@Override
public void configure(Context context) {
+ String morphlineFile = context.getString(MORPHLINE_FILE_PARAM);
+ String morphlineId = context.getString(MORPHLINE_ID_PARAM);
+ if (morphlineFile == null || morphlineFile.trim().length() == 0) {
+ throw new MorphlineCompilationException("Missing parameter: " + MORPHLINE_FILE_PARAM, null);
+ }
+ morphlineFileAndId = morphlineFile + "@" + morphlineId;
+
if (morphlineContext == null) {
FaultTolerance faultTolerance = new FaultTolerance(
context.getBoolean(FaultTolerance.IS_PRODUCTION_MODE, false),
@@ -77,37 +93,48 @@ public class MorphlineHandlerImpl implements MorphlineHandler {
morphlineContext = new MorphlineContext.Builder()
.setExceptionHandler(faultTolerance)
- .setMetricRegistry(new MetricRegistry())
+ .setMetricRegistry(SharedMetricRegistries.getOrCreate(morphlineFileAndId))
.build();
}
- String morphlineFile = context.getString(MORPHLINE_FILE_PARAM);
- String morphlineId = context.getString(MORPHLINE_ID_PARAM);
- if (morphlineFile == null || morphlineFile.trim().length() == 0) {
- throw new MorphlineCompilationException("Missing parameter: " + MORPHLINE_FILE_PARAM, null);
- }
Config override = ConfigFactory.parseMap(context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
morphline = new Compiler().compile(new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);
- morphlineFileAndId = morphlineFile + "@" + morphlineId;
+
+ this.mappingTimer = morphlineContext.getMetricRegistry().timer(
+ MetricRegistry.name("morphline.app", Metrics.ELAPSED_TIME));
+ this.numRecords = morphlineContext.getMetricRegistry().meter(
+ MetricRegistry.name("morphline.app", Metrics.NUM_RECORDS));
+ this.numFailedRecords = morphlineContext.getMetricRegistry().meter(
+ MetricRegistry.name("morphline.app", "numFailedRecords"));
+ this.numExceptionRecords = morphlineContext.getMetricRegistry().meter(
+ MetricRegistry.name("morphline.app", "numExceptionRecords"));
}
@Override
public void process(Event event) {
- Record record = new Record();
- for (Entry<String, String> entry : event.getHeaders().entrySet()) {
- record.put(entry.getKey(), entry.getValue());
- }
- byte[] bytes = event.getBody();
- if (bytes != null && bytes.length > 0) {
- record.put(Fields.ATTACHMENT_BODY, bytes);
- }
+ numRecords.mark();
+ Timer.Context timerContext = mappingTimer.time();
try {
- Notifications.notifyStartSession(morphline);
- if (!morphline.process(record)) {
- LOG.warn("Morphline {} failed to process record: {}", morphlineFileAndId, record);
+ Record record = new Record();
+ for (Entry<String, String> entry : event.getHeaders().entrySet()) {
+ record.put(entry.getKey(), entry.getValue());
+ }
+ byte[] bytes = event.getBody();
+ if (bytes != null && bytes.length > 0) {
+ record.put(Fields.ATTACHMENT_BODY, bytes);
+ }
+ try {
+ Notifications.notifyStartSession(morphline);
+ if (!morphline.process(record)) {
+ numFailedRecords.mark();
+ LOG.warn("Morphline {} failed to process record: {}", morphlineFileAndId, record);
+ }
+ } catch (RuntimeException t) {
+ numExceptionRecords.mark();
+ morphlineContext.getExceptionHandler().handleException(t, record);
}
- } catch (RuntimeException t) {
- morphlineContext.getExceptionHandler().handleException(t, record);
+ } finally {
+ timerContext.stop();
}
}