You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2013/02/25 18:15:36 UTC

[2/2] git commit: Add metrics reporting initialization

Updated Branches:
  refs/heads/S4-86 [created] bfe0498ea


Add metrics reporting initialization


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/bfe0498e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/bfe0498e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/bfe0498e

Branch: refs/heads/S4-86
Commit: bfe0498ea2c0e4cb66a80ec5bdb87b19b9a908f9
Parents: 0634b5a
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Feb 25 18:49:39 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Feb 25 18:51:59 2013 +0100

----------------------------------------------------------------------
 .../java/org/apache/s4/core/util/S4Metrics.java    |   44 +++++++++++++++
 1 files changed, 44 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bfe0498e/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index 0b81bbe..bc6c808 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -1,8 +1,11 @@
 package org.apache.s4.core.util;
 
+import java.io.File;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.s4.base.Emitter;
 import org.apache.s4.comm.topology.Assignment;
@@ -16,24 +19,35 @@ import org.slf4j.LoggerFactory;
 
 import com.beust.jcommander.internal.Lists;
 import com.beust.jcommander.internal.Maps;
+import com.google.common.base.Strings;
 import com.google.common.cache.LoadingCache;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import com.google.inject.name.Named;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
 
 @Singleton
 public class S4Metrics {
 
     private static Logger logger = LoggerFactory.getLogger(S4Metrics.class);
 
+    static final Pattern METRICS_CONFIG_PATTERN = Pattern
+            .compile("(csv:.+|console):(\\d+):(DAYS|HOURS|MICROSECONDS|MILLISECONDS|MINUTES|NANOSECONDS|SECONDS)");
+
     @Inject
     Emitter emitter;
 
     @Inject
     Assignment assignment;
 
+    @Inject(optional = true)
+    @Named("s4.metrics.config")
+    String metricsConfig;
+
     static List<Meter> partitionSenderMeters = Lists.newArrayList();
 
     private final Meter eventMeter = Metrics.newMeter(ReceiverImpl.class, "received-events", "event-count",
@@ -59,6 +73,36 @@ public class S4Metrics {
 
     @Inject
     private void init() {
+
+        if (Strings.isNullOrEmpty(metricsConfig)) {
+            logger.info("Metrics reporting not configured");
+        } else {
+            Matcher matcher = METRICS_CONFIG_PATTERN.matcher(metricsConfig);
+            if (!matcher.matches()) {
+                logger.error(
+                        "Invalid metrics configuration [{}]. Metrics configuration must match the pattern [{}]. Metrics reporting disabled.",
+                        metricsConfig, METRICS_CONFIG_PATTERN);
+            } else {
+                matcher.find();
+                String group1 = matcher.group(1);
+
+                if ("csv".equals(group1)) {
+                    String outputDir = matcher.group(2);
+                    long period = Long.valueOf(matcher.group(3));
+                    TimeUnit timeUnit = TimeUnit.valueOf(matcher.group(4));
+                    logger.info("Reporting metrics through csv files in directory [{}] with frequency of [{}] [{}]",
+                            new String[] { outputDir, String.valueOf(period), timeUnit.name() });
+                    CsvReporter.enable(new File(outputDir), period, timeUnit);
+                } else {
+                    long period = Long.valueOf(matcher.group(2));
+                    TimeUnit timeUnit = TimeUnit.valueOf(matcher.group(3));
+                    logger.info("Reporting metrics on the console with frequency of [{}] [{}]",
+                            new String[] { String.valueOf(period), timeUnit.name() });
+                    ConsoleReporter.enable(period, timeUnit);
+                }
+            }
+        }
+
         senderMeters = new Meter[emitter.getPartitionCount()];
         // int localPartitionId = assignment.assignClusterNode().getPartition();
         for (int i = 0; i < senderMeters.length; i++) {