You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by tm...@apache.org on 2020/07/07 14:42:00 UTC
[sling-org-apache-sling-distribution-journal] branch master
updated: SLING-9569: Emit error code when error event received if available
(#51)
This is an automated email from the ASF dual-hosted git repository.
tmaret pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new c2e055c SLING-9569: Emit error code when error event received if available (#51)
c2e055c is described below
commit c2e055c64426acfa9c5ee754b1d50a964dba2a34
Author: Amit Jain <am...@apache.org>
AuthorDate: Tue Jul 7 20:11:53 2020 +0530
SLING-9569: Emit error code when error event received if available (#51)
- Emit metric counter for the error code received from error event if available
---
.../journal/shared/DistributionMetricsService.java | 16 +++++-
.../journal/shared/JournalAvailableChecker.java | 19 ++++++-
.../shared/JournalAvailableCheckerTest.java | 66 +++++++++++++++++++---
3 files changed, 90 insertions(+), 11 deletions(-)
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
index 56e3b72..aaca10b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/DistributionMetricsService.java
@@ -311,7 +311,17 @@ public class DistributionMetricsService {
public Counter getQueueAccessErrorCount() {
return queueAccessErrorCount;
}
-
+
+ /**
+ * Counter of journal error codes.
+ *
+ * @return a Sling Metric counter
+ */
+ public Counter getJournalErrorCodeCount(String errorCode) {
+ return getCounter(
+ getNameWithLabel(getMetricName(BASE_COMPONENT, "journal_unavailable_error_code_count"), "error_code", errorCode));
+ }
+
public <T> GaugeService<T> createGauge(String name, String description, Supplier<T> supplier) {
return new GaugeService<>(name, description, supplier);
}
@@ -320,6 +330,10 @@ public class DistributionMetricsService {
return format("%s.%s", component, name);
}
+ private String getNameWithLabel(String name, String label, String labelVal) {
+ return format("%s;%s=%s", name, label, labelVal);
+ }
+
private Counter getCounter(String metricName) {
return metricsService.counter(metricName);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
index 6cb5170..1a5180f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/shared/JournalAvailableChecker.java
@@ -18,6 +18,8 @@
*/
package org.apache.sling.distribution.journal.shared;
+import java.util.Arrays;
+
import static java.util.Objects.requireNonNull;
import org.apache.commons.io.IOUtils;
@@ -63,10 +65,10 @@ public class JournalAvailableChecker implements EventHandler {
@Reference
DistributionMetricsService metrics;
- private JournalAvailableServiceMarker marker;
+ JournalAvailableServiceMarker marker;
private GaugeService<Boolean> gauge;
-
+
@Activate
public void activate(JournalCheckerConfiguration config, BundleContext context) {
requireNonNull(provider);
@@ -74,6 +76,10 @@ public class JournalAvailableChecker implements EventHandler {
this.backoffRetry = new ExponentialBackOff(config.initialRetryDelay(), config.maxRetryDelay(), true, this::run);
this.marker = new JournalAvailableServiceMarker(context);
this.gauge = metrics.createGauge(DistributionMetricsService.BASE_COMPONENT + ".journal_available", "", this::isAvailable);
+
+ Arrays.asList(config.trackedErrCodes()).stream().spliterator()
+ .forEachRemaining(code -> metrics.getJournalErrorCodeCount(code));
+
this.marker.register();
LOG.info("Started Journal availability checker service with initialRetryDelay {}, maxRetryDelay {}. Journal is initially assumed available.", config.initialRetryDelay(), config.maxRetryDelay());
}
@@ -122,6 +128,7 @@ public class JournalAvailableChecker implements EventHandler {
@Override
public synchronized void handleEvent(Event event) {
String type = (String) event.getProperty(ExceptionEventSender.KEY_TYPE);
+
if (this.marker.isRegistered()) {
LOG.warn("Received exception event {}. Journal is considered unavailable.", type);
this.marker.unRegister();
@@ -129,6 +136,10 @@ public class JournalAvailableChecker implements EventHandler {
} else {
LOG.info("Received exception event {}. Journal still unavailable.", type);
}
+ String errCode = (String) event.getProperty(ExceptionEventSender.KEY_ERROR_CODE);
+ if ((errCode != null) && !errCode.isEmpty()) {
+ metrics.getJournalErrorCodeCount(errCode).increment();
+ }
}
@ObjectClassDefinition(name = "Apache Sling Journal based Distribution - Journal Checker")
@@ -142,6 +153,8 @@ public class JournalAvailableChecker implements EventHandler {
description = "The max retry delay in milliseconds.")
long maxRetryDelay() default MAX_RETRY_DELAY;
+ @AttributeDefinition(name ="Tracked response codes",
+ description = "Response error codes tracked in metrics.")
+ String[] trackedErrCodes() default {"400", "401", "404", "405", "413", "500", "503", "505"};
}
-
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
index 20d5115..0ed61e3 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/JournalAvailableCheckerTest.java
@@ -21,6 +21,7 @@ package org.apache.sling.distribution.journal.shared;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doThrow;
@@ -30,7 +31,9 @@ import static org.osgi.util.converter.Converters.standardConverter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.sling.commons.metrics.Counter;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.JournalAvailable;
import org.apache.sling.distribution.journal.MessagingException;
@@ -78,13 +81,14 @@ public class JournalAvailableCheckerTest {
@SuppressWarnings("rawtypes")
@Mock
private GaugeService gauge;
-
+
@SuppressWarnings("unchecked")
@Before
public void before() throws Exception {
when(metrics.createGauge(Mockito.anyString(), Mockito.anyString(), Mockito.any())).thenReturn(gauge);
- doThrow(new MessagingException("expected", new RuntimeException("expected nested exception")))
- .when(provider).assertTopic(INVALID_TOPIC);
+ MessagingException me = new MessagingException("expected", new RuntimeException("expected nested exception"));
+ doThrow(me)
+ .when(provider).assertTopic(INVALID_TOPIC);
when(context.registerService(Mockito.eq(JournalAvailable.class), Mockito.any(JournalAvailable.class), Mockito.any()))
.thenReturn(sreg);
Map<String, String> config = new HashMap<>();
@@ -114,18 +118,36 @@ public class JournalAvailableCheckerTest {
@Test
public void testActivateChecksOnEvent() throws InterruptedException {
+ activateChecksOnEvent(new MessagingException("Expected", "400"), 1);
+ }
+
+ @Test
+ public void testActivateChecksOnEventNoErrCode() throws InterruptedException {
+ activateChecksOnEvent(new MessagingException("Expected"), 0);
+ }
+
+ @Test
+ public void testActivateChecksOnEventOtherException() throws InterruptedException {
+ activateChecksOnEvent(new IOException("Expected"), 0);
+ }
+
+ public void activateChecksOnEvent(Exception e, int expectedCounter) throws InterruptedException {
+ Counter counter = new TestCounter();
+ when(metrics.getJournalErrorCodeCount("400")).thenReturn(counter);
+
await("At the start checks are triggers and should set the state available")
.until(checker::isAvailable);
-
+
makeCheckFail();
- Event event = createErrorEvent(new IOException("Expected"));
+ Event event = createErrorEvent(e);
checker.handleEvent(event);
+ assertEquals(expectedCounter, counter.getCount());
await().until(() -> !checker.isAvailable());
Thread.sleep(1000); // Make sure we get at least one failed doCheck
makeCheckSucceed();
await().until(checker::isAvailable);
}
-
+
private void makeCheckSucceed() {
topics.activate(configuration(emptyMap(), TopicsConfiguration.class));
}
@@ -144,6 +166,36 @@ public class JournalAvailableCheckerTest {
Map<String, String> props = new HashMap<>();
props.put(ExceptionEventSender.KEY_TYPE, e.getClass().getName());
props.put(ExceptionEventSender.KEY_MESSAGE, e.getMessage());
+ if (e instanceof MessagingException) {
+ props.put(ExceptionEventSender.KEY_ERROR_CODE, ((MessagingException) e).getResponseCode());
+ }
return new Event(ExceptionEventSender.ERROR_TOPIC, props);
}
-}
\ No newline at end of file
+
+ class TestCounter implements Counter {
+ AtomicLong l = new AtomicLong();
+ @Override public void increment() {
+ l.getAndIncrement();
+ }
+
+ @Override public void decrement() {
+ l.decrementAndGet();
+ }
+
+ @Override public void increment(long n) {
+ l.addAndGet(n);
+ }
+
+ @Override public void decrement(long n) {
+ l.addAndGet(-n);
+ }
+
+ @Override public long getCount() {
+ return l.get();
+ }
+
+ @Override public <A> A adaptTo(Class<A> type) {
+ return null;
+ }
+ }
+}