You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/03/05 10:40:13 UTC
[2/4] git commit: Add metrics reporting initialization
Add metrics reporting initialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/bfe0498e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/bfe0498e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/bfe0498e
Branch: refs/heads/dev
Commit: bfe0498ea2c0e4cb66a80ec5bdb87b19b9a908f9
Parents: 0634b5a
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Feb 25 18:49:39 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Feb 25 18:51:59 2013 +0100
----------------------------------------------------------------------
.../java/org/apache/s4/core/util/S4Metrics.java | 44 +++++++++++++++
1 files changed, 44 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/bfe0498e/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
index 0b81bbe..bc6c808 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java
@@ -1,8 +1,11 @@
package org.apache.s4.core.util;
+import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.s4.base.Emitter;
import org.apache.s4.comm.topology.Assignment;
@@ -16,24 +19,35 @@ import org.slf4j.LoggerFactory;
import com.beust.jcommander.internal.Lists;
import com.beust.jcommander.internal.Maps;
+import com.google.common.base.Strings;
import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import com.google.inject.name.Named;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Meter;
+import com.yammer.metrics.reporting.ConsoleReporter;
+import com.yammer.metrics.reporting.CsvReporter;
@Singleton
public class S4Metrics {
private static Logger logger = LoggerFactory.getLogger(S4Metrics.class);
+ static final Pattern METRICS_CONFIG_PATTERN = Pattern
+ .compile("(csv:.+|console):(\\d+):(DAYS|HOURS|MICROSECONDS|MILLISECONDS|MINUTES|NANOSECONDS|SECONDS)");
+
@Inject
Emitter emitter;
@Inject
Assignment assignment;
+ @Inject(optional = true)
+ @Named("s4.metrics.config")
+ String metricsConfig;
+
static List<Meter> partitionSenderMeters = Lists.newArrayList();
private final Meter eventMeter = Metrics.newMeter(ReceiverImpl.class, "received-events", "event-count",
@@ -59,6 +73,36 @@ public class S4Metrics {
@Inject
private void init() {
+
+ if (Strings.isNullOrEmpty(metricsConfig)) {
+ logger.info("Metrics reporting not configured");
+ } else {
+ Matcher matcher = METRICS_CONFIG_PATTERN.matcher(metricsConfig);
+ if (!matcher.matches()) {
+ logger.error(
+ "Invalid metrics configuration [{}]. Metrics configuration must match the pattern [{}]. Metrics reporting disabled.",
+ metricsConfig, METRICS_CONFIG_PATTERN);
+ } else {
+ matcher.find();
+ String group1 = matcher.group(1);
+
+ if ("csv".equals(group1)) {
+ String outputDir = matcher.group(2);
+ long period = Long.valueOf(matcher.group(3));
+ TimeUnit timeUnit = TimeUnit.valueOf(matcher.group(4));
+ logger.info("Reporting metrics through csv files in directory [{}] with frequency of [{}] [{}]",
+ new String[] { outputDir, String.valueOf(period), timeUnit.name() });
+ CsvReporter.enable(new File(outputDir), period, timeUnit);
+ } else {
+ long period = Long.valueOf(matcher.group(2));
+ TimeUnit timeUnit = TimeUnit.valueOf(matcher.group(3));
+ logger.info("Reporting metrics on the console with frequency of [{}] [{}]",
+ new String[] { String.valueOf(period), timeUnit.name() });
+ ConsoleReporter.enable(period, timeUnit);
+ }
+ }
+ }
+
senderMeters = new Meter[emitter.getPartitionCount()];
// int localPartitionId = assignment.assignClusterNode().getPartition();
for (int i = 0; i < senderMeters.length; i++) {