You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/10/16 23:25:09 UTC

git commit: FLUME-2213. MorphlineInterceptor should share metric registry across threads for better (aggregate) reporting

Updated Branches:
  refs/heads/trunk 68fe4d451 -> c420fad5d


FLUME-2213. MorphlineInterceptor should share metric registry across threads for better (aggregate) reporting

(Wolfgang Hoschek via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c420fad5
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c420fad5
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c420fad5

Branch: refs/heads/trunk
Commit: c420fad5d03dc8d17dce7fe3e59bf3b742f3d22d
Parents: 68fe4d4
Author: Hari Shreedharan <hs...@apache.org>
Authored: Wed Oct 16 14:24:13 2013 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Wed Oct 16 14:24:13 2013 -0700

----------------------------------------------------------------------
 .../solr/morphline/MorphlineHandlerImpl.java    | 67 ++++++++++++++------
 1 file changed, 47 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c420fad5/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
index ea76322..cb88dc2 100644
--- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
+++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/main/java/org/apache/flume/sink/solr/morphline/MorphlineHandlerImpl.java
@@ -31,8 +31,12 @@ import com.cloudera.cdk.morphline.api.Record;
 import com.cloudera.cdk.morphline.base.Compiler;
 import com.cloudera.cdk.morphline.base.FaultTolerance;
 import com.cloudera.cdk.morphline.base.Fields;
+import com.cloudera.cdk.morphline.base.Metrics;
 import com.cloudera.cdk.morphline.base.Notifications;
+import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.SharedMetricRegistries;
+import com.codahale.metrics.Timer;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -46,6 +50,11 @@ public class MorphlineHandlerImpl implements MorphlineHandler {
   private Command finalChild;
   private String morphlineFileAndId;
   
+  private Timer mappingTimer;
+  private Meter numRecords;
+  private Meter numFailedRecords;
+  private Meter numExceptionRecords;
+  
   public static final String MORPHLINE_FILE_PARAM = "morphlineFile";
   public static final String MORPHLINE_ID_PARAM = "morphlineId";
   
@@ -69,6 +78,13 @@ public class MorphlineHandlerImpl implements MorphlineHandler {
 
   @Override
   public void configure(Context context) {
+    String morphlineFile = context.getString(MORPHLINE_FILE_PARAM);
+    String morphlineId = context.getString(MORPHLINE_ID_PARAM);
+    if (morphlineFile == null || morphlineFile.trim().length() == 0) {
+      throw new MorphlineCompilationException("Missing parameter: " + MORPHLINE_FILE_PARAM, null);
+    }
+    morphlineFileAndId = morphlineFile + "@" + morphlineId;
+    
     if (morphlineContext == null) {
       FaultTolerance faultTolerance = new FaultTolerance(
           context.getBoolean(FaultTolerance.IS_PRODUCTION_MODE, false), 
@@ -77,37 +93,48 @@ public class MorphlineHandlerImpl implements MorphlineHandler {
       
       morphlineContext = new MorphlineContext.Builder()
         .setExceptionHandler(faultTolerance)
-        .setMetricRegistry(new MetricRegistry())
+        .setMetricRegistry(SharedMetricRegistries.getOrCreate(morphlineFileAndId))
         .build();
     }
     
-    String morphlineFile = context.getString(MORPHLINE_FILE_PARAM);
-    String morphlineId = context.getString(MORPHLINE_ID_PARAM);
-    if (morphlineFile == null || morphlineFile.trim().length() == 0) {
-      throw new MorphlineCompilationException("Missing parameter: " + MORPHLINE_FILE_PARAM, null);
-    }
     Config override = ConfigFactory.parseMap(context.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
     morphline = new Compiler().compile(new File(morphlineFile), morphlineId, morphlineContext, finalChild, override);      
-    morphlineFileAndId = morphlineFile + "@" + morphlineId;
+    
+    this.mappingTimer = morphlineContext.getMetricRegistry().timer(
+        MetricRegistry.name("morphline.app", Metrics.ELAPSED_TIME));
+    this.numRecords = morphlineContext.getMetricRegistry().meter(
+        MetricRegistry.name("morphline.app", Metrics.NUM_RECORDS));
+    this.numFailedRecords = morphlineContext.getMetricRegistry().meter(
+        MetricRegistry.name("morphline.app", "numFailedRecords"));
+    this.numExceptionRecords = morphlineContext.getMetricRegistry().meter(
+        MetricRegistry.name("morphline.app", "numExceptionRecords"));
   }
 
   @Override
   public void process(Event event) {
-    Record record = new Record();
-    for (Entry<String, String> entry : event.getHeaders().entrySet()) {
-      record.put(entry.getKey(), entry.getValue());
-    }
-    byte[] bytes = event.getBody();
-    if (bytes != null && bytes.length > 0) {
-      record.put(Fields.ATTACHMENT_BODY, bytes);
-    }    
+    numRecords.mark();
+    Timer.Context timerContext = mappingTimer.time();
     try {
-      Notifications.notifyStartSession(morphline);
-      if (!morphline.process(record)) {
-        LOG.warn("Morphline {} failed to process record: {}", morphlineFileAndId, record);
+      Record record = new Record();
+      for (Entry<String, String> entry : event.getHeaders().entrySet()) {
+        record.put(entry.getKey(), entry.getValue());
+      }
+      byte[] bytes = event.getBody();
+      if (bytes != null && bytes.length > 0) {
+        record.put(Fields.ATTACHMENT_BODY, bytes);
+      }    
+      try {
+        Notifications.notifyStartSession(morphline);
+        if (!morphline.process(record)) {
+          numFailedRecords.mark();
+          LOG.warn("Morphline {} failed to process record: {}", morphlineFileAndId, record);
+        }
+      } catch (RuntimeException t) {
+        numExceptionRecords.mark();
+        morphlineContext.getExceptionHandler().handleException(t, record);
       }
-    } catch (RuntimeException t) {
-      morphlineContext.getExceptionHandler().handleException(t, record);
+    } finally {
+      timerContext.stop();
     }
   }