You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2014/01/15 13:30:26 UTC
svn commit: r1558361 - in /sling/trunk/contrib/extensions/replication: ./
src/main/java/org/apache/sling/replication/monitor/
src/test/java/org/apache/sling/replication/monitor/
Author: tommaso
Date: Wed Jan 15 12:30:26 2014
New Revision: 1558361
URL: http://svn.apache.org/r1558361
Log:
SLING-3318 - created replication queue health check
Added:
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/
sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java (with props)
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/
sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java (with props)
Modified:
sling/trunk/contrib/extensions/replication/pom.xml
Modified: sling/trunk/contrib/extensions/replication/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/pom.xml?rev=1558361&r1=1558360&r2=1558361&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/pom.xml (original)
+++ sling/trunk/contrib/extensions/replication/pom.xml Wed Jan 15 12:30:26 2014
@@ -131,14 +131,22 @@
<version>3.3.0</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.hc.core</artifactId>
+ <version>1.0.6</version>
+ <scope>provided</scope>
+ </dependency>
<!-- LOGGING -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
+ <version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
+ <version>1.6.2</version>
<scope>runtime</scope>
</dependency>
<!-- SPECs -->
Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java?rev=1558361&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java Wed Jan 15 12:30:26 2014
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.monitor;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyUnbounded;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.References;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.hc.api.HealthCheck;
+import org.apache.sling.hc.api.Result;
+import org.apache.sling.hc.util.FormattingResultLog;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link HealthCheck} that checks if replication queues' first item has been retried more than a configurable amount
+ * of times
+ */
+@Component(immediate = true,
+ metatype = true,
+ label = "Apache Sling Replication Queue Health Check")
+@Properties({
+ @Property(name = HealthCheck.NAME, value = "SlingReplicationQueueHC", description = "Health Check name", label = "Name"),
+ @Property(name = HealthCheck.TAGS, unbounded = PropertyUnbounded.ARRAY, description = "Health Check tags", label = "Tags"),
+ @Property(name = HealthCheck.MBEAN_NAME, value = "slingReplicationQueue", description = "Health Check MBean name", label = "MBean name")
+})
+@References({
+ @Reference(name = "replicationQueueProvider",
+ referenceInterface = ReplicationQueueProvider.class,
+ cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+ policy = ReferencePolicy.DYNAMIC,
+ bind = "bindReplicationQueueProvider",
+ unbind = "unbindReplicationQueueProvider")
+})
+
+@Service(value = HealthCheck.class)
+public class ReplicationQueueHealthCheck implements HealthCheck {
+
+ private static final Logger log = LoggerFactory.getLogger(ReplicationQueueHealthCheck.class);
+
+ private static final int DEFAULT_NUMBER_OF_RETRIES_ALLOWED = 3;
+
+ private int numberOfRetriesAllowed;
+
+ @Property(intValue = DEFAULT_NUMBER_OF_RETRIES_ALLOWED, description = "Number of allowed retries", label = "Allowed retries")
+ private static final String NUMBER_OF_RETRIES_ALLOWED = "numberOfRetriesAllowed";
+
+ private Collection<ReplicationQueueProvider> replicationQueueProviders;
+
+ @Activate
+ public void activate(final Map<String, Object> properties) {
+ replicationQueueProviders = new LinkedList<ReplicationQueueProvider>();
+ numberOfRetriesAllowed = PropertiesUtil.toInteger(properties.get(NUMBER_OF_RETRIES_ALLOWED), DEFAULT_NUMBER_OF_RETRIES_ALLOWED);
+ log.info("Activated, numberOfRetriesAllowed={}", numberOfRetriesAllowed);
+ }
+
+ @Deactivate
+ protected void deactivate() throws Exception {
+ replicationQueueProviders.clear();
+ }
+
+ protected void bindReplicationQueueProvider(final ReplicationQueueProvider replicationQueueProvider) {
+ synchronized (replicationQueueProviders) {
+ replicationQueueProviders.add(replicationQueueProvider);
+ }
+ log.debug("Registering replication queue provider {} ", replicationQueueProvider);
+ }
+
+ protected void unbindReplicationQueueProvider(final ReplicationQueueProvider replicationQueueProvider) {
+ synchronized (replicationQueueProviders) {
+ replicationQueueProviders.remove(replicationQueueProvider);
+ }
+ log.debug("Unregistering replication queue provider {} ", replicationQueueProvider);
+ }
+
+ public Result execute() {
+ final FormattingResultLog resultLog = new FormattingResultLog();
+ Map<String, Integer> failures = new HashMap<String, Integer>();
+ if (replicationQueueProviders != null && replicationQueueProviders.size() > 0) {
+
+ for (ReplicationQueueProvider replicationQueueProvider : replicationQueueProviders) {
+ for (ReplicationQueue q : replicationQueueProvider.getAllQueues())
+ try {
+ ReplicationPackage item = q.getHead();
+ if (item != null) {
+ ReplicationQueueItemState status = q.getStatus(item);
+ if (status.getAttempts() <= numberOfRetriesAllowed) {
+ resultLog.debug("Queue: [{}], first item: [{}], number of retries: {}", q.getName(), item.getId(), status.getAttempts());
+ } else {
+ // the no. of attempts is higher than the configured threshold
+ resultLog.warn("Queue: [{}], first item: [{}], number of retries: {}, expected number of retries <= {}",
+ q.getName(), item.getId(), status.getAttempts(), numberOfRetriesAllowed);
+ failures.put(q.getName(), status.getAttempts());
+ }
+ } else {
+ resultLog.debug("No items in queue [{}]", q.getName());
+ }
+
+ } catch (Exception e) {
+ resultLog.warn("Exception while inspecting replication queue [{}]: {}", q.getName(), e);
+ }
+ }
+ } else {
+ resultLog.debug("No replication queue providers found");
+ }
+
+ if (failures.size() > 0) {
+ // a specific log entry (using markdown) to provide a recommended user action
+ for (Map.Entry<String, Integer> entry : failures.entrySet()) {
+ resultLog.warn("Replication queue {}'s first item in the default queue has been retried {} times (threshold: {})",
+ entry.getKey(), entry.getValue(), numberOfRetriesAllowed);
+ }
+ }
+
+ return new Result(resultLog);
+ }
+
+}
\ No newline at end of file
Propchange: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java?rev=1558361&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java (added)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java Wed Jan 15 12:30:26 2014
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.monitor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import org.apache.sling.hc.api.Result;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Testcase for {@link org.apache.sling.replication.monitor.ReplicationQueueHealthCheck}
+ */
+public class ReplicationQueueHealthCheckTest {
+
+ @Test
+ public void testWithNoReplicationQueueProvider() throws Exception {
+ ReplicationQueueHealthCheck replicationQueueHealthCheck = new ReplicationQueueHealthCheck();
+ replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
+ Result result = replicationQueueHealthCheck.execute();
+ assertNotNull(result);
+ assertTrue(result.isOk());
+ }
+
+ @Test
+ public void testWithNoItemInTheQueue() throws Exception {
+ ReplicationQueueHealthCheck replicationQueueHealthCheck = new ReplicationQueueHealthCheck();
+
+ replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
+ ReplicationQueue queue = mock(ReplicationQueue.class);
+ when(queue.getHead()).thenReturn(null);
+ ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
+ Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
+ providers.add(queue);
+ when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
+ replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+
+ Result result = replicationQueueHealthCheck.execute();
+ assertNotNull(result);
+ assertTrue(result.isOk());
+ }
+
+ @Test
+ public void testWithOneOkItemInTheQueue() throws Exception {
+ ReplicationQueueHealthCheck replicationQueueHealthCheck = new ReplicationQueueHealthCheck();
+
+ replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
+ ReplicationQueue queue = mock(ReplicationQueue.class);
+ ReplicationPackage item = mock(ReplicationPackage.class);
+ ReplicationQueueItemState status = mock(ReplicationQueueItemState.class);
+ when(status.getAttempts()).thenReturn(1);
+ when(queue.getStatus(item)).thenReturn(status);
+ when(queue.getHead()).thenReturn(item);
+ ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
+ Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
+ providers.add(queue);
+ when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
+ replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+
+ Result result = replicationQueueHealthCheck.execute();
+ assertNotNull(result);
+ assertTrue(result.isOk());
+ }
+
+ @Test
+ public void testWithNotOkItemInTheQueue() throws Exception {
+ ReplicationQueueHealthCheck replicationQueueHealthCheck = new ReplicationQueueHealthCheck();
+
+ replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
+ ReplicationQueue queue = mock(ReplicationQueue.class);
+ ReplicationPackage item = mock(ReplicationPackage.class);
+ ReplicationQueueItemState status = mock(ReplicationQueueItemState.class);
+ when(status.getAttempts()).thenReturn(10);
+ when(queue.getStatus(item)).thenReturn(status);
+ when(queue.getHead()).thenReturn(item);
+ ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
+ Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
+ providers.add(queue);
+ when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
+ replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+
+ Result result = replicationQueueHealthCheck.execute();
+ assertNotNull(result);
+ assertFalse(result.isOk());
+ }
+}
Propchange: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
------------------------------------------------------------------------------
svn:eol-style = native