You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2021/05/07 20:51:18 UTC

[samza] branch master updated: SAMZA-2649: Add MetricsReporter which logs metrics to log file (#1495)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 96b09b5  SAMZA-2649: Add MetricsReporter which logs metrics to log file (#1495)
96b09b5 is described below

commit 96b09b5b0ba9172754a5df526cd4d9699b6fde96
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Fri May 7 13:51:12 2021 -0700

    SAMZA-2649: Add MetricsReporter which logs metrics to log file (#1495)
    
    Issues: In environments without an external metrics system, it can sometimes be hard to access any metrics from a Samza job. It can be useful to have a MetricsReporter which logs metrics so that they can be accessed in a simple way with minimal dependencies. This can be used to help verify certain flows are working. An example case for using this is when testing Samza on a bare-bones Kubernetes cluster in minikube.
    
    Changes: Added LoggingMetricsReporter which periodically logs metrics to a log file.
    
    API changes and usage/upgrade instructions:
    Add a new metrics reporter in the config which uses org.apache.samza.metrics.reporter.LoggingMetricsReporterFactory. See https://github.com/apache/samza/pull/1495 for an example of configuration.
---
 .../metrics/reporter/LoggingMetricsReporter.java   | 132 ++++++++++++++
 .../reporter/LoggingMetricsReporterConfig.java     |  46 +++++
 .../reporter/LoggingMetricsReporterFactory.java    |  45 +++++
 .../reporter/TestLoggingMetricsReporter.java       | 198 +++++++++++++++++++++
 .../reporter/TestLoggingMetricsReporterConfig.java |  54 ++++++
 5 files changed, 475 insertions(+)

diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporter.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporter.java
new file mode 100644
index 0000000..5961d7a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Metric;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsVisitor;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.metrics.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implementation of {@link MetricsReporter} which logs metrics which match a regex.
+ * The regex is checked against "[source name]-[group name]-[metric name]".
+ */
+public class LoggingMetricsReporter implements MetricsReporter {
+  private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class);
+  /**
+   * First part is source, second part is group name, third part is metric name
+   */
+  private static final String FULL_METRIC_FORMAT = "%s-%s-%s";
+
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final Pattern metricsToLog;
+  private final long loggingIntervalSeconds;
+  private final Queue<Runnable> loggingTasks = new ConcurrentLinkedQueue<>();
+
+  /**
+   * @param scheduledExecutorService executes the logging tasks
+   * @param metricsToLog Only log the metrics which match this regex. The strings for matching against this metric are
+   *                     constructed by concatenating source name, group name, and metric name, delimited by dashes.
+   * @param loggingIntervalSeconds interval at which to log metrics
+   */
+  public LoggingMetricsReporter(ScheduledExecutorService scheduledExecutorService, Pattern metricsToLog,
+      long loggingIntervalSeconds) {
+    this.scheduledExecutorService = scheduledExecutorService;
+    this.metricsToLog = metricsToLog;
+    this.loggingIntervalSeconds = loggingIntervalSeconds;
+  }
+
+  @Override
+  public void start() {
+    this.scheduledExecutorService.scheduleAtFixedRate(() -> this.loggingTasks.forEach(Runnable::run),
+        this.loggingIntervalSeconds, this.loggingIntervalSeconds, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void register(String source, ReadableMetricsRegistry registry) {
+    this.loggingTasks.add(buildLoggingTask(source, registry));
+  }
+
+  @Override
+  public void stop() {
+    this.scheduledExecutorService.shutdown();
+    try {
+      this.scheduledExecutorService.awaitTermination(10, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while shutting down executor", e);
+    }
+    if (!this.scheduledExecutorService.isTerminated()) {
+      LOG.warn("Unable to shutdown executor");
+    }
+  }
+
+  /**
+   * VisibleForTesting so that the logging call can be verified in unit tests.
+   */
+  @VisibleForTesting
+  void doLog(String logString) {
+    LOG.info(logString);
+  }
+
+  private Runnable buildLoggingTask(String source, ReadableMetricsRegistry registry) {
+    return () -> {
+      for (String group : registry.getGroups()) {
+        for (Map.Entry<String, Metric> metricGroupEntry : registry.getGroup(group).entrySet()) {
+          metricGroupEntry.getValue().visit(new MetricsVisitor() {
+            @Override
+            public void counter(Counter counter) {
+              logMetric(source, group, counter.getName(), counter.getCount());
+            }
+
+            @Override
+            public <T> void gauge(Gauge<T> gauge) {
+              logMetric(source, group, gauge.getName(), gauge.getValue());
+            }
+
+            @Override
+            public void timer(Timer timer) {
+              logMetric(source, group, timer.getName(), timer.getSnapshot().getAverage());
+            }
+          });
+        }
+      }
+    };
+  }
+
+  private <T> void logMetric(String source, String group, String metricName, T value) {
+    String fullMetricName = String.format(FULL_METRIC_FORMAT, source, group, metricName);
+    if (this.metricsToLog.matcher(fullMetricName).matches()) {
+      doLog(String.format("Metric: %s, Value: %s", fullMetricName, value));
+    }
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterConfig.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterConfig.java
new file mode 100644
index 0000000..e7a256f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Optional;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+
+
+public class LoggingMetricsReporterConfig extends MapConfig {
+  private static final String METRICS_TO_LOG_REGEX_CONFIG = "metrics.reporter.%s.log.regex";
+  private static final String LOGGING_INTERVAL_SECONDS_CONFIG = "metrics.reporter.%s.logging.interval.seconds";
+  private static final long LOGGING_INTERVAL_SECONDS_DEFAULT = 60;
+
+  public LoggingMetricsReporterConfig(Config config) {
+    super(config);
+  }
+
+  public String getMetricsToLogRegex(String reporterName) {
+    String metricsToLogConfigKey = String.format(METRICS_TO_LOG_REGEX_CONFIG, reporterName);
+    return Optional.ofNullable(get(metricsToLogConfigKey))
+        .orElseThrow(() -> new ConfigException("Missing value for " + metricsToLogConfigKey));
+  }
+
+  public long getLoggingIntervalSeconds(String reporterName) {
+    return getLong(String.format(LOGGING_INTERVAL_SECONDS_CONFIG, reporterName), LOGGING_INTERVAL_SECONDS_DEFAULT);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterFactory.java b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterFactory.java
new file mode 100644
index 0000000..88d3892
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/metrics/reporter/LoggingMetricsReporterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.regex.Pattern;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsReporterFactory;
+
+
+/**
+ * Creates a {@link MetricsReporter} which logs metrics and their values.
+ * This can be used to access metric values when no other external metrics system is available.
+ */
+public class LoggingMetricsReporterFactory implements MetricsReporterFactory {
+  @Override
+  public MetricsReporter getMetricsReporter(String name, String processorId, Config config) {
+    ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder().setNameFormat("Samza LoggingMetricsReporter Thread-%d").setDaemon(true).build());
+    LoggingMetricsReporterConfig loggingMetricsReporterConfig = new LoggingMetricsReporterConfig(config);
+    Pattern metricsToLog = Pattern.compile(loggingMetricsReporterConfig.getMetricsToLogRegex(name));
+    return new LoggingMetricsReporter(scheduledExecutorService, metricsToLog,
+        loggingMetricsReporterConfig.getLoggingIntervalSeconds(name));
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporter.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporter.java
new file mode 100644
index 0000000..c1f5972
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.Metric;
+import org.apache.samza.metrics.MetricsVisitor;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.metrics.Snapshot;
+import org.apache.samza.metrics.Timer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestLoggingMetricsReporter {
+  private static final long LOGGING_INTERVAL_SECONDS = 15;
+  private static final String COUNTER_NAME = "counter_name";
+  private static final long COUNTER_VALUE = 10;
+  private static final String GAUGE_NAME = "gauge_name";
+  private static final double GAUGE_VALUE = 20.0;
+  private static final String TIMER_NAME = "timer_name";
+  private static final double TIMER_VALUE = 30.0;
+  private static final Pattern DEFAULT_PATTERN = Pattern.compile(".*_name");
+  private static final String GROUP_NAME = "group_name";
+  private static final String SOURCE_NAME = "source_name";
+
+  @Mock
+  private ScheduledExecutorService scheduledExecutorService;
+  @Mock
+  private ReadableMetricsRegistry readableMetricsRegistry;
+  @Mock
+  private Counter counter;
+  @Mock
+  private Gauge<Double> gauge;
+  @Mock
+  private Timer timer;
+  @Mock
+  private Snapshot timerSnapshot;
+
+  private LoggingMetricsReporter loggingMetricsReporter;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+
+    when(this.scheduledExecutorService.scheduleAtFixedRate(any(), eq(LOGGING_INTERVAL_SECONDS),
+        eq(LOGGING_INTERVAL_SECONDS), eq(TimeUnit.SECONDS))).thenAnswer((Answer<Void>) invocation -> {
+          Runnable runnable = invocation.getArgumentAt(0, Runnable.class);
+          runnable.run();
+          return null;
+        });
+
+    when(this.counter.getName()).thenReturn(COUNTER_NAME);
+    when(this.counter.getCount()).thenReturn(COUNTER_VALUE);
+    doAnswer(invocation -> {
+      invocation.getArgumentAt(0, MetricsVisitor.class).counter(this.counter);
+      return null;
+    }).when(this.counter).visit(any());
+
+    when(this.gauge.getName()).thenReturn(GAUGE_NAME);
+    when(this.gauge.getValue()).thenReturn(GAUGE_VALUE);
+    doAnswer(invocation -> {
+      invocation.getArgumentAt(0, MetricsVisitor.class).gauge(this.gauge);
+      return null;
+    }).when(this.gauge).visit(any());
+
+    when(this.timer.getName()).thenReturn(TIMER_NAME);
+    when(this.timer.getSnapshot()).thenReturn(this.timerSnapshot);
+    doAnswer(invocation -> {
+      invocation.getArgumentAt(0, MetricsVisitor.class).timer(this.timer);
+      return null;
+    }).when(this.timer).visit(any());
+    when(this.timerSnapshot.getAverage()).thenReturn(TIMER_VALUE);
+
+    this.loggingMetricsReporter =
+        spy(new LoggingMetricsReporter(this.scheduledExecutorService, DEFAULT_PATTERN, LOGGING_INTERVAL_SECONDS));
+  }
+
+  @Test
+  public void testMetricTypes() {
+    when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+    Map<String, Metric> metrics =
+        ImmutableMap.of(COUNTER_NAME, this.counter, GAUGE_NAME, this.gauge, TIMER_NAME, this.timer);
+    when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(metrics);
+
+    this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+    this.loggingMetricsReporter.start();
+
+    verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-counter_name, Value: 10");
+    verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-gauge_name, Value: 20.0");
+    verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-timer_name, Value: 30.0");
+  }
+
+  @Test
+  public void testMultipleRegister() {
+    when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+    when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(ImmutableMap.of(COUNTER_NAME, this.counter));
+    ReadableMetricsRegistry otherRegistry = mock(ReadableMetricsRegistry.class);
+    String otherGroupName = "other_group";
+    when(otherRegistry.getGroups()).thenReturn(Collections.singleton(otherGroupName));
+    when(otherRegistry.getGroup(otherGroupName)).thenReturn(ImmutableMap.of(GAUGE_NAME, this.gauge));
+
+    this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+    this.loggingMetricsReporter.register("other_source", otherRegistry);
+    this.loggingMetricsReporter.start();
+
+    verify(this.loggingMetricsReporter).doLog("Metric: source_name-group_name-counter_name, Value: 10");
+    verify(this.loggingMetricsReporter).doLog("Metric: other_source-other_group-gauge_name, Value: 20.0");
+  }
+
+  @Test
+  public void testFiltering() {
+    Pattern countersOnly = Pattern.compile(".*counter.*");
+    this.loggingMetricsReporter =
+        spy(new LoggingMetricsReporter(this.scheduledExecutorService, countersOnly, LOGGING_INTERVAL_SECONDS));
+
+    when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+    Map<String, Metric> metrics = ImmutableMap.of(COUNTER_NAME, this.counter, GAUGE_NAME, this.gauge);
+    when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(metrics);
+
+    this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+    this.loggingMetricsReporter.start();
+
+    ArgumentCaptor<String> logs = ArgumentCaptor.forClass(String.class);
+    verify(this.loggingMetricsReporter).doLog(logs.capture());
+    assertEquals(Collections.singletonList("Metric: source_name-group_name-counter_name, Value: 10"),
+        logs.getAllValues());
+  }
+
+  @Test
+  public void testNewMetricsAfterRegister() {
+    when(this.readableMetricsRegistry.getGroups()).thenReturn(Collections.singleton(GROUP_NAME));
+    // first round of logging has one metric (counter only), second call has two (counter and gauge)
+    when(this.readableMetricsRegistry.getGroup(GROUP_NAME)).thenReturn(ImmutableMap.of(COUNTER_NAME, this.counter))
+        .thenReturn(ImmutableMap.of(COUNTER_NAME, this.counter, GAUGE_NAME, this.gauge));
+
+    // capture the logging task so it can be directly executed by the test
+    ArgumentCaptor<Runnable> loggingRunnable = ArgumentCaptor.forClass(Runnable.class);
+    when(this.scheduledExecutorService.scheduleAtFixedRate(loggingRunnable.capture(), eq(LOGGING_INTERVAL_SECONDS),
+        eq(LOGGING_INTERVAL_SECONDS), eq(TimeUnit.SECONDS))).thenReturn(null);
+
+    this.loggingMetricsReporter.register(SOURCE_NAME, this.readableMetricsRegistry);
+    this.loggingMetricsReporter.start();
+
+    // simulate first scheduled execution of logging task
+    loggingRunnable.getValue().run();
+    String expectedCounterLog = "Metric: source_name-group_name-counter_name, Value: 10";
+    // only should get log for counter for the first call
+    verify(this.loggingMetricsReporter).doLog(expectedCounterLog);
+    String expectedGaugeLog = "Metric: source_name-group_name-gauge_name, Value: 20.0";
+    verify(this.loggingMetricsReporter, never()).doLog(expectedGaugeLog);
+
+    // simulate second scheduled execution of logging task
+    loggingRunnable.getValue().run();
+    // should get second log for counter, first log for gauge
+    verify(this.loggingMetricsReporter, times(2)).doLog(expectedCounterLog);
+    verify(this.loggingMetricsReporter).doLog(expectedGaugeLog);
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporterConfig.java b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporterConfig.java
new file mode 100644
index 0000000..40b44e7
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/metrics/reporter/TestLoggingMetricsReporterConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.metrics.reporter;
+
+import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.MapConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestLoggingMetricsReporterConfig {
+  private static final String REPORTER_NAME = "reporter_name";
+
+  @Test
+  public void testGetMetricsToLogRegex() {
+    Map<String, String> configMap = ImmutableMap.of("metrics.reporter.reporter_name.log.regex", ".*metric.*");
+    assertEquals(".*metric.*",
+        new LoggingMetricsReporterConfig(new MapConfig(configMap)).getMetricsToLogRegex(REPORTER_NAME));
+  }
+
+  @Test(expected = ConfigException.class)
+  public void testGetMetricsToLogRegexMissing() {
+    new LoggingMetricsReporterConfig(new MapConfig()).getMetricsToLogRegex(REPORTER_NAME);
+  }
+
+  @Test
+  public void testGetLoggingIntervalSeconds() {
+    assertEquals(60, new LoggingMetricsReporterConfig(new MapConfig()).getLoggingIntervalSeconds(REPORTER_NAME));
+
+    Map<String, String> configMap = ImmutableMap.of("metrics.reporter.reporter_name.logging.interval.seconds", "100");
+    assertEquals(100,
+        new LoggingMetricsReporterConfig(new MapConfig(configMap)).getLoggingIntervalSeconds(REPORTER_NAME));
+  }
+}
\ No newline at end of file