You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2019/05/31 19:21:55 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-8447 - Add gauge for current retries per subscriber (#7)
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 2dc2bbd SLING-8447 - Add gauge for current retries per subscriber (#7)
2dc2bbd is described below
commit 2dc2bbd82c35ebd4d619ed7e77bf8fd781047034
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri May 31 21:21:51 2019 +0200
SLING-8447 - Add gauge for current retries per subscriber (#7)
* Add gauge for current retries per subscriber
---
.../journal/impl/queue/impl/PackageRetries.java | 3 +
.../impl/shared/DistributionMetricsService.java | 60 ++++++++--
.../impl/shared/JournalAvailableChecker.java | 8 ++
.../journal/impl/shared/JournalAvailableGauge.java | 39 -------
.../impl/subscriber/DistributionSubscriber.java | 9 +-
.../shared/DistributionMetricsServiceTest.java | 126 +++++++++++++++++++++
.../impl/shared/JournalAvailableCheckerTest.java | 10 ++
.../impl/shared/JournalAvalableGaugeTest.java | 47 --------
8 files changed, 208 insertions(+), 94 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
index 8a669cb..23f9463 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
@@ -44,4 +44,7 @@ public class PackageRetries {
return pubAgentNameToRetries.getOrDefault(pubAgentName, 0);
}
+ public int getSum() {
+ return pubAgentNameToRetries.values().stream().mapToInt(Integer::intValue).sum();
+ }
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index a81630c..c44d2ee 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -18,30 +18,39 @@
*/
package org.apache.sling.distribution.journal.impl.shared;
+import static java.lang.String.format;
+
+import java.io.Closeable;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.commons.metrics.Gauge;
import org.apache.sling.commons.metrics.Histogram;
import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
-
-import java.util.concurrent.Callable;
-
import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
-
-import static java.lang.String.format;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Component(service = DistributionMetricsService.class)
public class DistributionMetricsService {
-
public static final String BASE_COMPONENT = "distribution.journal";
private static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
- private static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
+ public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
+
+ private Logger log = LoggerFactory.getLogger(this.getClass());
@Reference
private MetricsService metricsService;
@@ -80,8 +89,11 @@ public class DistributionMetricsService {
private Counter queueCacheFetchCount;
+ private BundleContext context;
+
@Activate
public void activate(BundleContext context) {
+ this.context = context;
cleanupPackageRemovedCount = getCounter(getMetricName(PUB_COMPONENT, "cleanup_package_removed_count"));
cleanupPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "cleanup_package_duration"));
exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size"));
@@ -287,6 +299,10 @@ public class DistributionMetricsService {
public Counter getQueueCacheFetchCount() {
return queueCacheFetchCount;
}
+
+ public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
+ return new GaugeService<T>(name, description, supplier);
+ }
private String getMetricName(String component, String name) {
return format("%s.%s", component, name);
@@ -308,4 +324,34 @@ public class DistributionMetricsService {
return metricsService.meter(metricName);
}
+ public class GaugeService<T> implements Gauge<T>, Closeable {
+
+ @SuppressWarnings("rawtypes")
+ private ServiceRegistration<Gauge> reg;
+ private Supplier<T> supplier;
+
+ private GaugeService(String name, String description, Supplier<T> supplier) {
+ this.supplier = supplier;
+ Dictionary<String, String> props = new Hashtable<>();
+ props.put(Constants.SERVICE_DESCRIPTION, description);
+ props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
+ props.put(Gauge.NAME, name);
+ reg = context.registerService(Gauge.class, this, props);
+ }
+
+ @Override
+ public T getValue() {
+ return supplier.get();
+ }
+
+ @Override
+ public void close() {
+ try {
+ reg.unregister();
+ } catch (Exception e) {
+ log.warn("Error unregistering service", e);
+ }
+ }
+ }
+
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 4fc2540..0d51329 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.JournalAvailable;
@Component(
@@ -56,21 +57,28 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
@Reference
MessagingProvider provider;
+
+ @Reference
+ DistributionMetricsService metrics;
private BundleContext context;
private volatile ServiceRegistration<JournalAvailable> reg;
+ private GaugeService<Boolean> gauge;
+
@Activate
public void activate(BundleContext context) {
requireNonNull(provider);
requireNonNull(topics);
this.context = context;
+ gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
LOG.info("Started Journal availability checker service");
}
@Deactivate
public void deactivate() {
+ gauge.close();
unRegister();
LOG.info("Stopped Journal availability checker service");
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableGauge.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableGauge.java
deleted file mode 100644
index deefbfd..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableGauge.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.shared;
-
-import org.apache.sling.commons.metrics.Gauge;
-import org.osgi.framework.Constants;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-
-@Component(property = {
- Constants.SERVICE_DESCRIPTION + "=Journal Availablility Status",
- Constants.SERVICE_VENDOR + "=The Apache Software Foundation",
- "name=" + DistributionMetricsService.BASE_COMPONENT + ".journal_available"})
-public class JournalAvailableGauge implements Gauge<Boolean>{
- @Reference
- private JournalAvailableChecker journalChecker;
-
- @Override
- public Boolean getValue() {
- return journalChecker.isAvailable();
- }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 676d150..aad12aa 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -59,6 +59,7 @@ import javax.jcr.ValueFactory;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.impl.shared.AgentState;
@@ -167,6 +168,8 @@ public class DistributionSubscriber implements DistributionAgent {
private final PackageRetries packageRetries = new PackageRetries();
+ private GaugeService<Integer> retriesGauge;
+
private Closeable packagePoller;
private Closeable commandPoller;
@@ -278,6 +281,9 @@ public class DistributionSubscriber implements DistributionAgent {
format("Queue Processor for Subscriber agent %s", subAgentName));
sender = messagingProvider.createSender();
+
+ String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + config.name();
+ retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
MessageSender<DiscoveryMessage> disSender = messagingProvider.createSender();
@@ -330,7 +336,8 @@ public class DistributionSubscriber implements DistributionAgent {
@Deactivate
public void deactivate() {
- announcer.close();
+ IOUtils.closeQuietly(retriesGauge);
+ IOUtils.closeQuietly(announcer);
componentReg.unregister();
IOUtils.closeQuietly(packagePoller);
IOUtils.closeQuietly(commandPoller);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
new file mode 100644
index 0000000..1049ef5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.commons.metrics.Gauge;
+import org.apache.sling.commons.metrics.Histogram;
+import org.apache.sling.commons.metrics.Meter;
+import org.apache.sling.commons.metrics.MetricsService;
+import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.commons.metrics.Timer.Context;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DistributionMetricsServiceTest {
+
+ @Mock
+ MetricsService metricsService;
+
+ @InjectMocks
+ DistributionMetricsService metrics;
+
+ @Mock
+ private BundleContext context;
+
+ @SuppressWarnings("rawtypes")
+ @Mock
+ private ServiceRegistration<Gauge> reg;
+
+ @Mock
+ private Timer timer;
+
+ @Mock
+ private Histogram histogram;
+
+ @Mock
+ private Meter meter;
+
+ @Before
+ public void before() {
+ mockBehaviour(metricsService);
+
+ when(context.registerService(Mockito.eq(Gauge.class), Mockito.any(Gauge.class), Mockito.any())).thenReturn(reg);
+ metrics.activate(context);
+ }
+
+ public static void mockBehaviour(MetricsService metricsService) {
+ when(metricsService.counter(Mockito.anyString())).thenReturn(Mockito.mock(Counter.class));
+ Timer timer = Mockito.mock(Timer.class);
+ when(timer.time()).thenReturn(Mockito.mock(Context.class));
+ when(metricsService.timer(Mockito.anyString())).thenReturn(timer);
+ when(metricsService.histogram(Mockito.anyString())).thenReturn(Mockito.mock(Histogram.class));
+ when(metricsService.meter(Mockito.anyString())).thenReturn(Mockito.mock(Meter.class));
+ }
+
+ @Test
+ public void testGetMetrics() {
+ assertNotNull(metrics.getAcceptedRequests());
+ assertNotNull(metrics.getBuildPackageDuration());
+ assertNotNull(metrics.getCleanupPackageDuration());
+ assertNotNull(metrics.getCleanupPackageRemovedCount());
+ assertNotNull(metrics.getDroppedRequests());
+ assertNotNull(metrics.getEnqueuePackageDuration());
+ assertNotNull(metrics.getExportedPackageSize());
+ assertNotNull(metrics.getFailedPackageImports());
+ assertNotNull(metrics.getImportedPackageDuration());
+ assertNotNull(metrics.getImportedPackageSize());
+ assertNotNull(metrics.getItemsBufferSize());
+ assertNotNull(metrics.getPackageDistributedDuration());
+ assertNotNull(metrics.getProcessQueueItemDuration());
+ assertNotNull(metrics.getQueueCacheFetchCount());
+ assertNotNull(metrics.getRemovedFailedPackageDuration());
+ assertNotNull(metrics.getRemovedPackageDuration());
+ assertNotNull(metrics.getSendStoredStatusDuration());
+ }
+
+ @Test
+ public void testGauge() {
+ GaugeService<Integer> gauge = metrics.createGauge("name", "desc", () -> 42);
+ verify(context).registerService(Mockito.eq(Gauge.class), Mockito.eq(gauge), Mockito.any());
+ assertThat(gauge.getValue(), equalTo(42));
+ gauge.close();
+ verify(reg).unregister();
+ }
+
+ @Test
+ public void testGagueErrorOnClose() {
+ doThrow(new IllegalStateException("Expected")).when(reg).unregister();
+ GaugeService<Integer> gauge = metrics.createGauge("name", "desc", () -> 42);
+ assertThat(gauge.getValue(), equalTo(42));
+ gauge.close();
+ verify(reg).unregister();
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
index 3226b8e..622013a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -56,15 +57,24 @@ public class JournalAvailableCheckerTest {
@Mock
private MessagingProvider provider;
+
+ @Mock
+ private DistributionMetricsService metrics;
@Mock
private BundleContext context;
@Mock
private ServiceRegistration<JournalAvailable> reg;
+
+ @SuppressWarnings("rawtypes")
+ @Mock
+ private GaugeService gauge;
+ @SuppressWarnings("unchecked")
@Before
public void before() throws Exception {
+ when(metrics.createGauge(Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(gauge);
doThrow(new MessagingException("topic is invalid"))
.when(provider).assertTopic(INVALID_TOPIC);
when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.any(JournalAvailable.class), Mockito.any()))
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvalableGaugeTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvalableGaugeTest.java
deleted file mode 100644
index e3cfb61..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvalableGaugeTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.shared;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class JournalAvalableGaugeTest {
-
- @Mock
- JournalAvailableChecker journalChecker;
-
- @InjectMocks
- private JournalAvailableGauge journalAvailableGauge;
-
- @Test
- public void test() {
- when(journalChecker.isAvailable()).thenReturn(true, false);
- assertThat(journalAvailableGauge.getValue(), equalTo(true));
- assertThat(journalAvailableGauge.getValue(), equalTo(false));
- }
-
-}