You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by to...@apache.org on 2014/01/15 13:30:26 UTC

svn commit: r1558361 - in /sling/trunk/contrib/extensions/replication: ./ src/main/java/org/apache/sling/replication/monitor/ src/test/java/org/apache/sling/replication/monitor/

Author: tommaso
Date: Wed Jan 15 12:30:26 2014
New Revision: 1558361

URL: http://svn.apache.org/r1558361
Log:
SLING-3318 - created replication queue health check

Added:
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java   (with props)
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java   (with props)
Modified:
    sling/trunk/contrib/extensions/replication/pom.xml

Modified: sling/trunk/contrib/extensions/replication/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/pom.xml?rev=1558361&r1=1558360&r2=1558361&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/pom.xml (original)
+++ sling/trunk/contrib/extensions/replication/pom.xml Wed Jan 15 12:30:26 2014
@@ -131,14 +131,22 @@
       <version>3.3.0</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.sling</groupId>
+      <artifactId>org.apache.sling.hc.core</artifactId>
+      <version>1.0.6</version>
+      <scope>provided</scope>
+   </dependency>
     <!-- LOGGING -->
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
+      <version>1.6.2</version>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
+      <version>1.6.2</version>
       <scope>runtime</scope>
     </dependency>
     <!-- SPECs -->

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java?rev=1558361&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java Wed Jan 15 12:30:26 2014
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.monitor;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyUnbounded;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.References;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.hc.api.HealthCheck;
+import org.apache.sling.hc.api.Result;
+import org.apache.sling.hc.util.FormattingResultLog;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link HealthCheck} that checks if replication queues' first item has been retried more than a configurable amount
+ * of times
+ */
+@Component(immediate = true,
+        metatype = true,
+        label = "Apache Sling Replication Queue Health Check")
+@Properties({
+        @Property(name = HealthCheck.NAME, value = "SlingReplicationQueueHC", description = "Health Check name", label = "Name"),
+        @Property(name = HealthCheck.TAGS, unbounded = PropertyUnbounded.ARRAY, description = "Health Check tags", label = "Tags"),
+        @Property(name = HealthCheck.MBEAN_NAME, value = "slingReplicationQueue", description = "Health Check MBean name", label = "MBean name")
+})
+@References({
+        @Reference(name = "replicationQueueProvider",
+                referenceInterface = ReplicationQueueProvider.class,
+                cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE,
+                policy = ReferencePolicy.DYNAMIC,
+                bind = "bindReplicationQueueProvider",
+                unbind = "unbindReplicationQueueProvider")
+})
+
+@Service(value = HealthCheck.class)
+public class ReplicationQueueHealthCheck implements HealthCheck {
+
+    private static final Logger log = LoggerFactory.getLogger(ReplicationQueueHealthCheck.class);
+
+    private static final int DEFAULT_NUMBER_OF_RETRIES_ALLOWED = 3;
+
+    private int numberOfRetriesAllowed;
+
+    @Property(intValue = DEFAULT_NUMBER_OF_RETRIES_ALLOWED, description = "Number of allowed retries", label = "Allowed retries")
+    private static final String NUMBER_OF_RETRIES_ALLOWED = "numberOfRetriesAllowed";
+
+    private Collection<ReplicationQueueProvider> replicationQueueProviders;
+
+    @Activate
+    public void activate(final Map<String, Object> properties) {
+        replicationQueueProviders = new LinkedList<ReplicationQueueProvider>();
+        numberOfRetriesAllowed = PropertiesUtil.toInteger(properties.get(NUMBER_OF_RETRIES_ALLOWED), DEFAULT_NUMBER_OF_RETRIES_ALLOWED);
+        log.info("Activated, numberOfRetriesAllowed={}", numberOfRetriesAllowed);
+    }
+
+    @Deactivate
+    protected void deactivate() throws Exception {
+        replicationQueueProviders.clear();
+    }
+
+    protected void bindReplicationQueueProvider(final ReplicationQueueProvider replicationQueueProvider) {
+        synchronized (replicationQueueProviders) {
+            replicationQueueProviders.add(replicationQueueProvider);
+        }
+        log.debug("Registering replication queue provider {} ", replicationQueueProvider);
+    }
+
+    protected void unbindReplicationQueueProvider(final ReplicationQueueProvider replicationQueueProvider) {
+        synchronized (replicationQueueProviders) {
+            replicationQueueProviders.remove(replicationQueueProvider);
+        }
+        log.debug("Unregistering replication queue provider {} ", replicationQueueProvider);
+    }
+
+    public Result execute() {
+        final FormattingResultLog resultLog = new FormattingResultLog();
+        Map<String, Integer> failures = new HashMap<String, Integer>();
+        if (replicationQueueProviders != null && replicationQueueProviders.size() > 0) {
+
+            for (ReplicationQueueProvider replicationQueueProvider : replicationQueueProviders) {
+                for (ReplicationQueue q : replicationQueueProvider.getAllQueues())
+                    try {
+                        ReplicationPackage item = q.getHead();
+                        if (item != null) {
+                            ReplicationQueueItemState status = q.getStatus(item);
+                            if (status.getAttempts() <= numberOfRetriesAllowed) {
+                                resultLog.debug("Queue: [{}], first item: [{}], number of retries: {}", q.getName(), item.getId(), status.getAttempts());
+                            } else {
+                                // the no. of attempts is higher than the configured threshold
+                                resultLog.warn("Queue: [{}], first item: [{}], number of retries: {}, expected number of retries <= {}",
+                                        q.getName(), item.getId(), status.getAttempts(), numberOfRetriesAllowed);
+                                failures.put(q.getName(), status.getAttempts());
+                            }
+                        } else {
+                            resultLog.debug("No items in queue [{}]", q.getName());
+                        }
+
+                    } catch (Exception e) {
+                        resultLog.warn("Exception while inspecting replication queue [{}]: {}", q.getName(), e);
+                    }
+            }
+        } else {
+            resultLog.debug("No replication queue providers found");
+        }
+
+        if (failures.size() > 0) {
+            // a specific log entry (using markdown) to provide a recommended user action
+            for (Map.Entry<String, Integer> entry : failures.entrySet()) {
+                resultLog.warn("Replication queue {}'s first item in the default queue has been retried {} times (threshold: {})",
+                        entry.getKey(), entry.getValue(), numberOfRetriesAllowed);
+            }
+        }
+
+        return new Result(resultLog);
+    }
+
+}
\ No newline at end of file

Propchange: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheck.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java?rev=1558361&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java (added)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java Wed Jan 15 12:30:26 2014
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.monitor;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import org.apache.sling.hc.api.Result;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Testcase for {@link org.apache.sling.replication.monitor.ReplicationQueueHealthCheck}
+ */
+public class ReplicationQueueHealthCheckTest {
+
+    @Test
+    public void testWithNoReplicationQueueProvider() throws Exception {
+        ReplicationQueueHealthCheck replicationQueueHealthCheck = new ReplicationQueueHealthCheck();
+        replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
+        Result result = replicationQueueHealthCheck.execute();
+        assertNotNull(result);
+        assertTrue(result.isOk());
+    }
+
+    @Test
+    public void testWithNoItemInTheQueue() throws Exception {
+        ReplicationQueueHealthCheck replicationQueueHealthCheck = new ReplicationQueueHealthCheck();
+
+        replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
+        ReplicationQueue queue = mock(ReplicationQueue.class);
+        when(queue.getHead()).thenReturn(null);
+        ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
+        Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
+        providers.add(queue);
+        when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
+        replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+
+        Result result = replicationQueueHealthCheck.execute();
+        assertNotNull(result);
+        assertTrue(result.isOk());
+    }
+
+    @Test
+    public void testWithOneOkItemInTheQueue() throws Exception {
+        ReplicationQueueHealthCheck replicationQueueHealthCheck = new ReplicationQueueHealthCheck();
+
+        replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
+        ReplicationQueue queue = mock(ReplicationQueue.class);
+        ReplicationPackage item = mock(ReplicationPackage.class);
+        ReplicationQueueItemState status = mock(ReplicationQueueItemState.class);
+        when(status.getAttempts()).thenReturn(1);
+        when(queue.getStatus(item)).thenReturn(status);
+        when(queue.getHead()).thenReturn(item);
+        ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
+        Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
+        providers.add(queue);
+        when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
+        replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+
+        Result result = replicationQueueHealthCheck.execute();
+        assertNotNull(result);
+        assertTrue(result.isOk());
+    }
+
+    @Test
+    public void testWithNotOkItemInTheQueue() throws Exception {
+        ReplicationQueueHealthCheck replicationQueueHealthCheck = new ReplicationQueueHealthCheck();
+
+        replicationQueueHealthCheck.activate(Collections.<String, Object>emptyMap());
+        ReplicationQueue queue = mock(ReplicationQueue.class);
+        ReplicationPackage item = mock(ReplicationPackage.class);
+        ReplicationQueueItemState status = mock(ReplicationQueueItemState.class);
+        when(status.getAttempts()).thenReturn(10);
+        when(queue.getStatus(item)).thenReturn(status);
+        when(queue.getHead()).thenReturn(item);
+        ReplicationQueueProvider replicationQueueProvider = mock(ReplicationQueueProvider.class);
+        Collection<ReplicationQueue> providers = new LinkedList<ReplicationQueue>();
+        providers.add(queue);
+        when(replicationQueueProvider.getAllQueues()).thenReturn(providers);
+        replicationQueueHealthCheck.bindReplicationQueueProvider(replicationQueueProvider);
+
+        Result result = replicationQueueHealthCheck.execute();
+        assertNotNull(result);
+        assertFalse(result.isOk());
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/monitor/ReplicationQueueHealthCheckTest.java
------------------------------------------------------------------------------
    svn:eol-style = native