You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2022/11/21 14:06:58 UTC

[sling-org-apache-sling-distribution-journal] 01/01: Use new gauge api, constructor injection

This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch gauge
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 0d32b1b24f630d3db9cf2a18c53a6ef784be5ed9
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Nov 21 15:06:31 2022 +0100

    Use new gauge api, constructor injection
---
 .../journal/bookkeeper/BookKeeper.java             |   6 +-
 .../impl/publisher/DistributionPublisher.java      |   6 +-
 .../journal/shared/DistributionMetricsService.java | 107 ++++++---------------
 .../journal/shared/JournalAvailableChecker.java    |   6 +-
 .../shared/DistributionMetricsServiceTest.java     |  55 +----------
 .../shared/JournalAvailableCheckerTest.java        |   7 --
 6 files changed, 38 insertions(+), 149 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 5e85f60..35e6267 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -35,7 +35,6 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -52,7 +51,6 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
 import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
 import org.apache.sling.distribution.journal.shared.NoOpInvalidationProcessor;
 import org.osgi.service.event.Event;
@@ -102,7 +100,6 @@ public class BookKeeper implements Closeable {
     private final PackageRetries packageRetries = new PackageRetries();
     private final LocalStore statusStore;
     private final LocalStore processedOffsets;
-    private final GaugeService<Integer> retriesGauge;
     private final ImportPostProcessor importPostProcessor;
     private final InvalidationProcessor invalidationProcessor;
     private int skippedCounter = 0;
@@ -123,7 +120,7 @@ public class BookKeeper implements Closeable {
         this.logSender = logSender;
         this.config = config;
         String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + config.getSubAgentName();
-        this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+        distributionMetricsService.createGauge(nameRetries, packageRetries::getSum);
         this.resolverFactory = resolverFactory;
         this.distributionMetricsService = distributionMetricsService;
         // Error queues are enabled when the number
@@ -385,7 +382,6 @@ public class BookKeeper implements Closeable {
 
     @Override
     public void close() throws IOException {
-        IOUtils.closeQuietly(retriesGauge);
     }
     
     private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index f65486c..792e930 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -132,8 +132,6 @@ public class DistributionPublisher implements DistributionAgent {
 
     private JMXRegistration reg;
 
-    private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
-
     private Closeable statusPoller;
 
     private DistributionLogEventListener distributionLogEventListener;
@@ -174,9 +172,8 @@ public class DistributionPublisher implements DistributionAgent {
         
         String msg = format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s",
                 pubAgentName, pkgType, queuedTimeout);
-        subscriberCountGauge = distributionMetricsService.createGauge(
+        distributionMetricsService.createGauge(
                 DistributionMetricsService.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName,
-                "Current number of publish subscribers",
                 () -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
         );
         
@@ -195,7 +192,6 @@ public class DistributionPublisher implements DistributionAgent {
         componentReg.unregister();
         String msg = format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s",
                 pubAgentName, pkgType, queuedTimeout);
-        IOUtils.closeQuietly(subscriberCountGauge);
         log.info(msg);
     }
     
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index febdaf0..5ce4357 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -20,9 +20,6 @@ package org.apache.sling.distribution.journal.shared;
 
 import static java.lang.String.format;
 
-import java.io.Closeable;
-import java.util.Dictionary;
-import java.util.Hashtable;
 import java.util.concurrent.Callable;
 import java.util.function.Supplier;
 
@@ -32,14 +29,9 @@ import org.apache.sling.commons.metrics.Histogram;
 import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Component(service = DistributionMetricsService.class)
 public class DistributionMetricsService {
@@ -50,70 +42,65 @@ public class DistributionMetricsService {
 
     public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
     
-    private final Logger log = LoggerFactory.getLogger(this.getClass());
+    private final MetricsService metricsService;
 
-    @Reference
-    private MetricsService metricsService;
+    private final Counter cleanupPackageRemovedCount;
 
-    private Counter cleanupPackageRemovedCount;
+    private final Timer cleanupPackageDuration;
 
-    private Timer cleanupPackageDuration;
+    private final Histogram importedPackageSize;
 
-    private Histogram importedPackageSize;
+    private final  Histogram exportedPackageSize;
 
-    private Histogram exportedPackageSize;
+    private final  Meter acceptedRequests;
 
-    private Meter acceptedRequests;
+    private final  Meter droppedRequests;
 
-    private Meter droppedRequests;
+    private final  Counter itemsBufferSize;
 
-    private Counter itemsBufferSize;
+    private final  Timer removedPackageDuration;
 
-    private Timer removedPackageDuration;
+    private final  Timer removedFailedPackageDuration;
 
-    private Timer removedFailedPackageDuration;
+    private final  Timer importedPackageDuration;
 
-    private Timer importedPackageDuration;
+    private final  Meter failedPackageImports;
 
-    private Meter failedPackageImports;
+    private final  Timer sendStoredStatusDuration;
 
-    private Timer sendStoredStatusDuration;
+    private final  Timer processQueueItemDuration;
 
-    private Timer processQueueItemDuration;
+    private final  Timer packageDistributedDuration;
 
-    private Timer packageDistributedDuration;
+    private final  Timer packageJournalDistributionDuration;
 
-    private Timer packageJournalDistributionDuration;
+    private final  Timer buildPackageDuration;
 
-    private Timer buildPackageDuration;
+    private final  Timer enqueuePackageDuration;
 
-    private Timer enqueuePackageDuration;
+    private final  Counter queueCacheFetchCount;
 
-    private Counter queueCacheFetchCount;
+    private final  Counter queueAccessErrorCount;
 
-    private Counter queueAccessErrorCount;
-
-    private Timer importPostProcessDuration;
+    private final  Timer importPostProcessDuration;
     
-    private Counter importPostProcessSuccess;
-
-    private Counter importPostProcessRequest;
+    private final  Counter importPostProcessSuccess;
 
-    private Timer invalidationProcessDuration;
+    private final  Counter importPostProcessRequest;
 
-    private Counter invalidationProcessSuccess;
+    private final  Timer invalidationProcessDuration;
 
-    private Counter invalidationProcessRequest;
+    private final  Counter invalidationProcessSuccess;
 
-    private Counter transientImportErrors;
+    private final  Counter invalidationProcessRequest;
 
-    private Counter permanentImportErrors;
+    private final  Counter transientImportErrors;
 
-    private BundleContext context;
+    private final  Counter permanentImportErrors;
 
     @Activate
-    public void activate(BundleContext context) {
-        this.context = context;
+    public DistributionMetricsService(@Reference MetricsService metricsService) {
+        this.metricsService = metricsService;
         cleanupPackageRemovedCount = getCounter(getMetricName(PUB_COMPONENT, "cleanup_package_removed_count"));
         cleanupPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "cleanup_package_duration"));
         exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size"));
@@ -369,8 +356,8 @@ public class DistributionMetricsService {
         );
     }
 
-    public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
-        return new GaugeService<>(name, description, supplier);
+    public <T> Gauge<T> createGauge(String name, Supplier<T> supplier) {
+        return metricsService.gauge(name, supplier);
     }
 
     private String getMetricName(String component, String name) {
@@ -425,34 +412,4 @@ public class DistributionMetricsService {
 
     public Counter getPermanentImportErrors() { return permanentImportErrors; }
 
-    public class GaugeService<T> implements Gauge<T>, Closeable {
-        
-        @SuppressWarnings("rawtypes")
-        private final ServiceRegistration<Gauge> reg;
-        private final Supplier<T> supplier;
-
-        private GaugeService(String name, String description, Supplier<T> supplier) {
-            this.supplier = supplier;
-            Dictionary<String, String> props = new Hashtable<>();
-            props.put(Constants.SERVICE_DESCRIPTION, description);
-            props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
-            props.put(Gauge.NAME, name);
-            reg = context.registerService(Gauge.class, this, props);
-        }
-
-        @Override
-        public T getValue() {
-            return supplier.get();
-        }
-        
-        @Override
-        public void close() {
-            try {
-                reg.unregister();
-            } catch (Exception e) {
-                log.warn("Error unregistering service", e);
-            }
-        }
-    }
-    
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
index 58773bd..70f1529 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
@@ -25,7 +25,6 @@ import static java.util.Objects.requireNonNull;
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.osgi.framework.BundleContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -67,15 +66,13 @@ public class JournalAvailableChecker implements EventHandler {
     
     JournalAvailableServiceMarker marker;
 
-    private GaugeService<Boolean> gauge;
-
     @Activate
     public void activate(JournalCheckerConfiguration config, BundleContext context) {
         requireNonNull(provider);
         requireNonNull(topics);
         this.backoffRetry = new ExponentialBackOff(config.initialRetryDelay(), config.maxRetryDelay(), true, this::run);
         this.marker = new JournalAvailableServiceMarker(context);
-        this.gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
+        metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", this::isAvailable);
 
         Arrays.asList(config.trackedErrCodes()).stream().spliterator()
             .forEachRemaining(code -> metrics.getJournalErrorCodeCount(code));
@@ -86,7 +83,6 @@ public class JournalAvailableChecker implements EventHandler {
 
     @Deactivate
     public void deactivate() {
-        gauge.close();
         this.marker.unRegister();
         IOUtils.closeQuietly(this.backoffRetry);
         LOG.info("Stopped Journal availability checker service");
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
index c536844..fc57bf7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
@@ -18,62 +18,30 @@
  */
 package org.apache.sling.distribution.journal.shared;
 
-import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.commons.metrics.Gauge;
 import org.apache.sling.commons.metrics.Histogram;
 import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.metrics.Timer.Context;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
 
 @RunWith(MockitoJUnitRunner.class)
 public class DistributionMetricsServiceTest {
     
-    @Mock
-    MetricsService metricsService;
-    
-    @InjectMocks
     DistributionMetricsService metrics;
 
-    @Mock
-    private BundleContext context;
-
-    @SuppressWarnings("rawtypes")
-    @Mock
-    private ServiceRegistration<Gauge> reg;
-
-    @Mock
-    private Timer timer;
-
-    @Mock
-    private Histogram histogram;
-
-    @Mock
-    private Meter meter;
-    
     @Before
     public void before() {
-        mockBehaviour(metricsService);
-        
-        when(context.registerService(Mockito.eq(Gauge.class), Mockito.any(Gauge.class), Mockito.any())).thenReturn(reg);
-        metrics.activate(context);
+        MetricsService metricsService = MetricsService.NOOP;
+        metrics = new DistributionMetricsService(metricsService);
     }
 
     public static void mockBehaviour(MetricsService metricsService) {
@@ -109,23 +77,6 @@ public class DistributionMetricsServiceTest {
         assertNotNull(metrics.getPackageStatusCounter("mockStatus"));
         assertNotNull(metrics.getTransientImportErrors());
         assertNotNull(metrics.getPermanentImportErrors());
-    }
-    
-    @Test
-    public void testGauge() {
-        GaugeService<Integer> gauge = metrics.createGauge("name", "desc", () -> 42);
-        verify(context).registerService(Mockito.eq(Gauge.class), Mockito.eq(gauge), Mockito.any());
-        assertThat(gauge.getValue(), equalTo(42));
-        gauge.close();
-        verify(reg).unregister();
-    }
-    
-    @Test
-    public void testGagueErrorOnClose() {
-        doThrow(new IllegalStateException("Expected")).when(reg).unregister();
-        GaugeService<Integer> gauge = metrics.createGauge("name", "desc", () -> 42);
-        assertThat(gauge.getValue(), equalTo(42));
-        gauge.close();
-        verify(reg).unregister();
+        metrics.createGauge("name", () -> 42);
     }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
index 7a40eb2..d699858 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
@@ -38,7 +38,6 @@ import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
 import org.apache.sling.distribution.journal.shared.JournalAvailableChecker.JournalCheckerConfiguration;
 import org.apache.sling.distribution.journal.shared.Topics.TopicsConfiguration;
 import org.junit.After;
@@ -78,14 +77,8 @@ public class JournalAvailableCheckerTest {
     @Mock
     private ServiceRegistration<JournalAvailable> sreg;
 
-    @SuppressWarnings("rawtypes")
-    @Mock
-    private GaugeService gauge;
-
-    @SuppressWarnings("unchecked")
     @Before
     public void before() throws Exception {
-        when(metrics.createGauge(Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(gauge);
         MessagingException me = new MessagingException("expected", new RuntimeException("expected nested exception"));
         doThrow(me)
             .when(provider).assertTopic(INVALID_TOPIC);