You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2019/05/31 19:21:55 UTC

[sling-org-apache-sling-distribution-journal] branch master updated: SLING-8447 - Add gauge for current retries per subscriber (#7)

This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dc2bbd  SLING-8447 - Add gauge for current retries per subscriber (#7)
2dc2bbd is described below

commit 2dc2bbd82c35ebd4d619ed7e77bf8fd781047034
Author: Christian Schneider <cs...@adobe.com>
AuthorDate: Fri May 31 21:21:51 2019 +0200

    SLING-8447 - Add gauge for current retries per subscriber (#7)
    
    * Add gauge for current retries per subscriber
---
 .../journal/impl/queue/impl/PackageRetries.java    |   3 +
 .../impl/shared/DistributionMetricsService.java    |  60 ++++++++--
 .../impl/shared/JournalAvailableChecker.java       |   8 ++
 .../journal/impl/shared/JournalAvailableGauge.java |  39 -------
 .../impl/subscriber/DistributionSubscriber.java    |   9 +-
 .../shared/DistributionMetricsServiceTest.java     | 126 +++++++++++++++++++++
 .../impl/shared/JournalAvailableCheckerTest.java   |  10 ++
 .../impl/shared/JournalAvalableGaugeTest.java      |  47 --------
 8 files changed, 208 insertions(+), 94 deletions(-)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
index 8a669cb..23f9463 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PackageRetries.java
@@ -44,4 +44,7 @@ public class PackageRetries {
         return pubAgentNameToRetries.getOrDefault(pubAgentName, 0);
     }
 
+    public int getSum() {
+        return pubAgentNameToRetries.values().stream().mapToInt(Integer::intValue).sum();
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index a81630c..c44d2ee 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -18,30 +18,39 @@
  */
 package org.apache.sling.distribution.journal.impl.shared;
 
+import static java.lang.String.format;
+
+import java.io.Closeable;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
 import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.commons.metrics.Gauge;
 import org.apache.sling.commons.metrics.Histogram;
 import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
-
-import java.util.concurrent.Callable;
-
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
-
-import static java.lang.String.format;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Component(service = DistributionMetricsService.class)
 public class DistributionMetricsService {
 
-
     public static final String BASE_COMPONENT = "distribution.journal";
 
     private static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
 
-    private static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
+    public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";
+    
+    private Logger log = LoggerFactory.getLogger(this.getClass());
 
     @Reference
     private MetricsService metricsService;
@@ -80,8 +89,11 @@ public class DistributionMetricsService {
 
     private Counter queueCacheFetchCount;
 
+    private BundleContext context;
+
     @Activate
     public void activate(BundleContext context) {
+        this.context = context;
         cleanupPackageRemovedCount = getCounter(getMetricName(PUB_COMPONENT, "cleanup_package_removed_count"));
         cleanupPackageDuration = getTimer(getMetricName(PUB_COMPONENT, "cleanup_package_duration"));
         exportedPackageSize = getHistogram(getMetricName(PUB_COMPONENT, "exported_package_size"));
@@ -287,6 +299,10 @@ public class DistributionMetricsService {
     public Counter getQueueCacheFetchCount() {
         return queueCacheFetchCount;
     }
+    
+    public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
+        return new GaugeService<T>(name, description, supplier);
+    }
 
     private String getMetricName(String component, String name) {
         return format("%s.%s", component, name);
@@ -308,4 +324,34 @@ public class DistributionMetricsService {
         return metricsService.meter(metricName);
     }
 
+    public class GaugeService<T> implements Gauge<T>, Closeable {
+        
+        @SuppressWarnings("rawtypes")
+        private ServiceRegistration<Gauge> reg;
+        private Supplier<T> supplier;
+
+        private GaugeService(String name, String description, Supplier<T> supplier) {
+            this.supplier = supplier;
+            Dictionary<String, String> props = new Hashtable<>();
+            props.put(Constants.SERVICE_DESCRIPTION, description);
+            props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
+            props.put(Gauge.NAME, name);
+            reg = context.registerService(Gauge.class, this, props);
+        }
+
+        @Override
+        public T getValue() {
+            return supplier.get();
+        }
+        
+        @Override
+        public void close() {
+            try {
+                reg.unregister();
+            } catch (Exception e) {
+                log.warn("Error unregistering service", e);
+            }
+        }
+    }
+    
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index 4fc2540..0d51329 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.apache.sling.distribution.journal.JournalAvailable;
 
 @Component(
@@ -56,21 +57,28 @@ public class JournalAvailableChecker implements JournalAvailable, Runnable {
     
     @Reference
     MessagingProvider provider;
+    
+    @Reference
+    DistributionMetricsService metrics;
 
     private BundleContext context;
 
     private volatile ServiceRegistration<JournalAvailable> reg;
 
+    private GaugeService<Boolean> gauge;
+
     @Activate
     public void activate(BundleContext context) {
         requireNonNull(provider);
         requireNonNull(topics);
         this.context = context;
+        gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
         LOG.info("Started Journal availability checker service");
     }
 
     @Deactivate
     public void deactivate() {
+        gauge.close();
         unRegister();
         LOG.info("Stopped Journal availability checker service");
     }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableGauge.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableGauge.java
deleted file mode 100644
index deefbfd..0000000
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableGauge.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.shared;
-
-import org.apache.sling.commons.metrics.Gauge;
-import org.osgi.framework.Constants;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
-
-@Component(property = { 
-        Constants.SERVICE_DESCRIPTION + "=Journal Availablility Status",
-        Constants.SERVICE_VENDOR + "=The Apache Software Foundation",
-        "name=" + DistributionMetricsService.BASE_COMPONENT + ".journal_available"})
-public class JournalAvailableGauge implements Gauge<Boolean>{
-    @Reference
-    private JournalAvailableChecker journalChecker;
-
-    @Override
-    public Boolean getValue() {
-        return journalChecker.isAvailable();
-    }
-
-}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 676d150..aad12aa 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -59,6 +59,7 @@ import javax.jcr.ValueFactory;
 
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.apache.sling.distribution.journal.impl.shared.SimpleDistributionResponse;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.impl.shared.AgentState;
@@ -167,6 +168,8 @@ public class DistributionSubscriber implements DistributionAgent {
 
     private final PackageRetries packageRetries = new PackageRetries();
 
+    private GaugeService<Integer> retriesGauge;
+
     private Closeable packagePoller;
 
     private Closeable commandPoller;
@@ -278,6 +281,9 @@ public class DistributionSubscriber implements DistributionAgent {
                 format("Queue Processor for Subscriber agent %s", subAgentName));
 
         sender = messagingProvider.createSender();
+        
+        String nameRetries = DistributionMetricsService.SUB_COMPONENT + ".current_retries;sub_name=" + config.name();
+        retriesGauge = distributionMetricsService.createGauge(nameRetries, "Retries of current package", packageRetries::getSum);
 
         int announceDelay = PropertiesUtil.toInteger(properties.get("announceDelay"), 10000);
         MessageSender<DiscoveryMessage> disSender = messagingProvider.createSender();
@@ -330,7 +336,8 @@ public class DistributionSubscriber implements DistributionAgent {
 
     @Deactivate
     public void deactivate() {
-        announcer.close();
+        IOUtils.closeQuietly(retriesGauge);
+        IOUtils.closeQuietly(announcer);
         componentReg.unregister();
         IOUtils.closeQuietly(packagePoller);
         IOUtils.closeQuietly(commandPoller);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
new file mode 100644
index 0000000..1049ef5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsServiceTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.shared;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.sling.commons.metrics.Counter;
+import org.apache.sling.commons.metrics.Gauge;
+import org.apache.sling.commons.metrics.Histogram;
+import org.apache.sling.commons.metrics.Meter;
+import org.apache.sling.commons.metrics.MetricsService;
+import org.apache.sling.commons.metrics.Timer;
+import org.apache.sling.commons.metrics.Timer.Context;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DistributionMetricsServiceTest {
+    
+    @Mock
+    MetricsService metricsService;
+    
+    @InjectMocks
+    DistributionMetricsService metrics;
+
+    @Mock
+    private BundleContext context;
+
+    @SuppressWarnings("rawtypes")
+    @Mock
+    private ServiceRegistration<Gauge> reg;
+
+    @Mock
+    private Timer timer;
+
+    @Mock
+    private Histogram histogram;
+
+    @Mock
+    private Meter meter;
+    
+    @Before
+    public void before() {
+        mockBehaviour(metricsService);
+        
+        when(context.registerService(Mockito.eq(Gauge.class), Mockito.any(Gauge.class), Mockito.any())).thenReturn(reg);
+        metrics.activate(context);
+    }
+
+    public static void mockBehaviour(MetricsService metricsService) {
+        when(metricsService.counter(Mockito.anyString())).thenReturn(Mockito.mock(Counter.class));
+        Timer timer = Mockito.mock(Timer.class);
+        when(timer.time()).thenReturn(Mockito.mock(Context.class));
+        when(metricsService.timer(Mockito.anyString())).thenReturn(timer);
+        when(metricsService.histogram(Mockito.anyString())).thenReturn(Mockito.mock(Histogram.class));
+        when(metricsService.meter(Mockito.anyString())).thenReturn(Mockito.mock(Meter.class));
+    }
+
+    @Test
+    public void testGetMetrics() {
+        assertNotNull(metrics.getAcceptedRequests());
+        assertNotNull(metrics.getBuildPackageDuration());
+        assertNotNull(metrics.getCleanupPackageDuration());
+        assertNotNull(metrics.getCleanupPackageRemovedCount());
+        assertNotNull(metrics.getDroppedRequests());
+        assertNotNull(metrics.getEnqueuePackageDuration());
+        assertNotNull(metrics.getExportedPackageSize());
+        assertNotNull(metrics.getFailedPackageImports());
+        assertNotNull(metrics.getImportedPackageDuration());
+        assertNotNull(metrics.getImportedPackageSize());
+        assertNotNull(metrics.getItemsBufferSize());
+        assertNotNull(metrics.getPackageDistributedDuration());
+        assertNotNull(metrics.getProcessQueueItemDuration());
+        assertNotNull(metrics.getQueueCacheFetchCount());
+        assertNotNull(metrics.getRemovedFailedPackageDuration());
+        assertNotNull(metrics.getRemovedPackageDuration());
+        assertNotNull(metrics.getSendStoredStatusDuration());
+    }
+    
+    @Test
+    public void testGauge() {
+        GaugeService<Integer> gauge = metrics.createGauge("name", "desc", () -> 42);
+        verify(context).registerService(Mockito.eq(Gauge.class), Mockito.eq(gauge), Mockito.any());
+        assertThat(gauge.getValue(), equalTo(42));
+        gauge.close();
+        verify(reg).unregister();
+    }
+    
+    @Test
+    public void testGagueErrorOnClose() {
+        doThrow(new IllegalStateException("Expected")).when(reg).unregister();
+        GaugeService<Integer> gauge = metrics.createGauge("name", "desc", () -> 42);
+        assertThat(gauge.getValue(), equalTo(42));
+        gauge.close();
+        verify(reg).unregister();
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
index 3226b8e..622013a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableCheckerTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import org.apache.sling.distribution.journal.JournalAvailable;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService.GaugeService;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -56,15 +57,24 @@ public class JournalAvailableCheckerTest {
 
     @Mock
     private MessagingProvider provider;
+    
+    @Mock
+    private DistributionMetricsService metrics;
 
     @Mock
     private BundleContext context;
 
     @Mock
     private ServiceRegistration<JournalAvailable> reg;
+
+    @SuppressWarnings("rawtypes")
+    @Mock
+    private GaugeService gauge;
     
+    @SuppressWarnings("unchecked")
     @Before
     public void before() throws Exception {
+        when(metrics.createGauge(Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(gauge);
         doThrow(new MessagingException("topic is invalid"))
                 .when(provider).assertTopic(INVALID_TOPIC);
         when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.any(JournalAvailable.class), Mockito.any()))
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvalableGaugeTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvalableGaugeTest.java
deleted file mode 100644
index e3cfb61..0000000
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/JournalAvalableGaugeTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.impl.shared;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.when;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class JournalAvalableGaugeTest {
-
-    @Mock
-    JournalAvailableChecker journalChecker;
-    
-    @InjectMocks
-    private JournalAvailableGauge journalAvailableGauge;
-
-    @Test
-    public void test() {
-        when(journalChecker.isAvailable()).thenReturn(true, false);
-        assertThat(journalAvailableGauge.getValue(), equalTo(true));
-        assertThat(journalAvailableGauge.getValue(), equalTo(false));
-    }
-    
-}