You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cs...@apache.org on 2022/11/21 14:06:58 UTC
[sling-org-apache-sling-distribution-journal] 01/01: Use new gauge api, constructor injection
This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch gauge
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 0d32b1b24f630d3db9cf2a18c53a6ef784be5ed9
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Mon Nov 21 15:06:31 2022 +0100
Use new gauge api, constructor injection
---
.../journal/bookkeeper/BookKeeper.java | 6 +-
.../impl/publisher/DistributionPublisher.java | 6 +-
.../journal/shared/DistributionMetricsService.java | 107 ++++++---------------
.../journal/shared/JournalAvailableChecker.java | 6 +-
.../shared/DistributionMetricsServiceTest.java | 55 +----------
.../shared/JournalAvailableCheckerTest.java | 7 --
6 files changed, 38 insertions(+), 149 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 5e85f60..35e6267 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -35,7 +35,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolver;
@@ -52,7 +51,6 @@ import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
import org.apache.sling.distribution.journal.shared.DistributionMetricsService;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
import org.apache.sling.distribution.journal.shared.NoOpInvalidationProcessor;
import org.osgi.service.event.Event;
@@ -102,7 +100,6 @@ public class BookKeeper implements Closeable {
private final PackageRetries packageRetries = new PackageRetries();
private final LocalStore statusStore;
private final LocalStore processedOffsets;
- private final GaugeService<Integer> retriesGauge;
private final ImportPostProcessor importPostProcessor;
private final InvalidationProcessor invalidationProcessor;
private int skippedCounter = 0;
@@ -123,7 +120,7 @@ public class BookKeeper implements Closeable {
this.logSender = logSender;
this.config = config;
String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + config.getSubAgentName();
- this.retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
+ distributionMetricsService.createGauge(nameRetries, packageRetries::getSum);
this.resolverFactory = resolverFactory;
this.distributionMetricsService = distributionMetricsService;
// Error queues are enabled when the number
@@ -385,7 +382,6 @@ public class BookKeeper implements Closeable {
@Override
public void close() throws IOException {
- IOUtils.closeQuietly(retriesGauge);
}
private void removeFailedPackage(PackageMessage pkgMsg, long offset) throws DistributionException {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index f65486c..792e930 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -132,8 +132,6 @@ public class DistributionPublisher implements DistributionAgent {
private JMXRegistration reg;
- private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
-
private Closeable statusPoller;
private DistributionLogEventListener distributionLogEventListener;
@@ -174,9 +172,8 @@ public class DistributionPublisher implements DistributionAgent {
String msg = format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s",
pubAgentName, pkgType, queuedTimeout);
- subscriberCountGauge = distributionMetricsService.createGauge(
+ distributionMetricsService.createGauge(
DistributionMetricsService.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName,
- "Current number of publish subscribers",
() -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
);
@@ -195,7 +192,6 @@ public class DistributionPublisher implements DistributionAgent {
componentReg.unregister();
String msg = format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s",
pubAgentName, pkgType, queuedTimeout);
- IOUtils.closeQuietly(subscriberCountGauge);
log.info(msg);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index febdaf0..5ce4357 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -20,9 +20,6 @@ package org.apache.sling.distribution.journal.shared;
import static java.lang.String.format;
-import java.io.Closeable;
-import java.util.Dictionary;
-import java.util.Hashtable;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
@@ -32,14 +29,9 @@ import org.apache.sling.commons.metrics.Histogram;
import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@Component(service = DistributionMetricsService.class)
public class DistributionMetricsService {
@@ -50,70 +42,65 @@ public class DistributionMetricsService {
public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
- private final Logger log = LoggerFactory.getLogger(this.getClass());
+ private final MetricsService metricsService;
- @Reference
- private MetricsService metricsService;
+ private final Counter cleanupPackageRemovedCount;
- private Counter cleanupPackageRemovedCount;
+ private final Timer cleanupPackageDuration;
- private Timer cleanupPackageDuration;
+ private final Histogram importedPackageSize;
- private Histogram importedPackageSize;
+ private final Histogram exportedPackageSize;
- private Histogram exportedPackageSize;
+ private final Meter acceptedRequests;
- private Meter acceptedRequests;
+ private final Meter droppedRequests;
- private Meter droppedRequests;
+ private final Counter itemsBufferSize;
- private Counter itemsBufferSize;
+ private final Timer removedPackageDuration;
- private Timer removedPackageDuration;
+ private final Timer removedFailedPackageDuration;
- private Timer removedFailedPackageDuration;
+ private final Timer importedPackageDuration;
- private Timer importedPackageDuration;
+ private final Meter failedPackageImports;
- private Meter failedPackageImports;
+ private final Timer sendStoredStatusDuration;
- private Timer sendStoredStatusDuration;
+ private final Timer processQueueItemDuration;
- private Timer processQueueItemDuration;
+ private final Timer packageDistributedDuration;
- private Timer packageDistributedDuration;
+ private final Timer packageJournalDistributionDuration;
- private Timer packageJournalDistributionDuration;
+ private final Timer buildPackageDuration;
- private Timer buildPackageDuration;
+ private final Timer enqueuePackageDuration;
- private Timer enqueuePackageDuration;
+ private final Counter queueCacheFetchCount;
- private Counter queueCacheFetchCount;
+ private final Counter queueAccessErrorCount;
- private Counter queueAccessErrorCount;
-
- private Timer importPostProcessDuration;
+ private final Timer importPostProcessDuration;
- private Counter importPostProcessSuccess;
-
- private Counter importPostProcessRequest;
+ private final Counter importPostProcessSuccess;
- private Timer invalidationProcessDuration;
+ private final Counter importPostProcessRequest;
- private Counter invalidationProcessSuccess;
+ private final Timer invalidationProcessDuration;
- private Counter invalidationProcessRequest;
+ private final Counter invalidationProcessSuccess;
- private Counter transientImportErrors;
+ private final Counter invalidationProcessRequest;
- private Counter permanentImportErrors;
+ private final Counter transientImportErrors;
- private BundleContext context;
+ private final Counter permanentImportErrors;
@Activate
- public void activate(BundleContext context) {
- this.context = context;
+ public DistributionMetricsService(@Reference MetricsService metricsService) {
+ this.metricsService = metricsService;
cleanupPackageRemovedCount = getCounter(getMetricName(PUB_COMPONENT, "cleanup_package_removed_count"));
cleanupPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "cleanup_package_duration"));
exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size"));
@@ -369,8 +356,8 @@ public class DistributionMetricsService {
);
}
- public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
- return new GaugeService<>(name, description, supplier);
+ public <T> Gauge<T> createGauge(String name, Supplier<T> supplier) {
+ return metricsService.gauge(name, supplier);
}
private String getMetricName(String component, String name) {
@@ -425,34 +412,4 @@ public class DistributionMetricsService {
public Counter getPermanentImportErrors() { return permanentImportErrors; }
- public class GaugeService<T> implements Gauge<T>, Closeable {
-
- @SuppressWarnings("rawtypes")
- private final ServiceRegistration<Gauge> reg;
- private final Supplier<T> supplier;
-
- private GaugeService(String name, String description, Supplier<T> supplier) {
- this.supplier = supplier;
- Dictionary<String, String> props = new Hashtable<>();
- props.put(Constants.SERVICE_DESCRIPTION, description);
- props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
- props.put(Gauge.NAME, name);
- reg = context.registerService(Gauge.class, this, props);
- }
-
- @Override
- public T getValue() {
- return supplier.get();
- }
-
- @Override
- public void close() {
- try {
- reg.unregister();
- } catch (Exception e) {
- log.warn("Error unregistering service", e);
- }
- }
- }
-
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
index 58773bd..70f1529 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
@@ -25,7 +25,6 @@ import static java.util.Objects.requireNonNull;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -67,15 +66,13 @@ public class JournalAvailableChecker implements EventHandler {
JournalAvailableServiceMarker marker;
- private GaugeService<Boolean> gauge;
-
@Activate
public void activate(JournalCheckerConfiguration config, BundleContext context) {
requireNonNull(provider);
requireNonNull(topics);
this.backoffRetry = new ExponentialBackOff(config.initialRetryDelay(), config.maxRetryDelay(), true, this::run);
this.marker = new JournalAvailableServiceMarker(context);
- this.gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
+ metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", this::isAvailable);
Arrays.asList(config.trackedErrCodes()).stream().spliterator()
.forEachRemaining(code -> metrics.getJournalErrorCodeCount(code));
@@ -86,7 +83,6 @@ public class JournalAvailableChecker implements EventHandler {
@Deactivate
public void deactivate() {
- gauge.close();
this.marker.unRegister();
IOUtils.closeQuietly(this.backoffRetry);
LOG.info("Stopped Journal availability checker service");
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
index c536844..fc57bf7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/DistributionMetricsServiceTest.java
@@ -18,62 +18,30 @@
*/
package org.apache.sling.distribution.journal.shared;
-import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.commons.metrics.Gauge;
import org.apache.sling.commons.metrics.Histogram;
import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.commons.metrics.Timer.Context;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
@RunWith(MockitoJUnitRunner.class)
public class DistributionMetricsServiceTest {
- @Mock
- MetricsService metricsService;
-
- @InjectMocks
DistributionMetricsService metrics;
- @Mock
- private BundleContext context;
-
- @SuppressWarnings("rawtypes")
- @Mock
- private ServiceRegistration<Gauge> reg;
-
- @Mock
- private Timer timer;
-
- @Mock
- private Histogram histogram;
-
- @Mock
- private Meter meter;
-
@Before
public void before() {
- mockBehaviour(metricsService);
-
- when(context.registerService(Mockito.eq(Gauge.class), Mockito.any(Gauge.class), Mockito.any())).thenReturn(reg);
- metrics.activate(context);
+ MetricsService metricsService = MetricsService.NOOP;
+ metrics = new DistributionMetricsService(metricsService);
}
public static void mockBehaviour(MetricsService metricsService) {
@@ -109,23 +77,6 @@ public class DistributionMetricsServiceTest {
assertNotNull(metrics.getPackageStatusCounter("mockStatus"));
assertNotNull(metrics.getTransientImportErrors());
assertNotNull(metrics.getPermanentImportErrors());
- }
-
- @Test
- public void testGauge() {
- GaugeService<Integer> gauge = metrics.createGauge("name", "desc", () -> 42);
- verify(context).registerService(Mockito.eq(Gauge.class), Mockito.eq(gauge), Mockito.any());
- assertThat(gauge.getValue(), equalTo(42));
- gauge.close();
- verify(reg).unregister();
- }
-
- @Test
- public void testGagueErrorOnClose() {
- doThrow(new IllegalStateException("Expected")).when(reg).unregister();
- GaugeService<Integer> gauge = metrics.createGauge("name", "desc", () -> 42);
- assertThat(gauge.getValue(), equalTo(42));
- gauge.close();
- verify(reg).unregister();
+ metrics.createGauge("name", () -> 42);
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
index 7a40eb2..d699858 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
@@ -38,7 +38,6 @@ import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.shared.JournalAvailableChecker.JournalCheckerConfiguration;
import org.apache.sling.distribution.journal.shared.Topics.TopicsConfiguration;
import org.junit.After;
@@ -78,14 +77,8 @@ public class JournalAvailableCheckerTest {
@Mock
private ServiceRegistration<JournalAvailable> sreg;
- @SuppressWarnings("rawtypes")
- @Mock
- private GaugeService gauge;
-
- @SuppressWarnings("unchecked")
@Before
public void before() throws Exception {
- when(metrics.createGauge(Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(gauge);
MessagingException me = new MessagingException("expected", new RuntimeException("expected nested exception"));
doThrow(me)
.when(provider).assertTopic(INVALID_TOPIC);