You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/01/13 10:01:27 UTC

svn commit: r1557667 - in /sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor: HealthCheckExecutorImpl.java HealthCheckResultCache.java

Author: cziegeler
Date: Mon Jan 13 09:01:27 2014
New Revision: 1557667

URL: http://svn.apache.org/r1557667
Log:
SLING-3278 : Provide a HealthCheckExecutor service. Simplify code for single reference execution, remove cached entry if service is unregistered

Modified:
    sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java
    sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java

Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java?rev=1557667&r1=1557666&r2=1557667&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java (original)
+++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java Mon Jan 13 09:01:27 2014
@@ -44,12 +44,17 @@ import org.apache.sling.commons.osgi.Pro
 import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.hc.api.HealthCheck;
 import org.apache.sling.hc.api.Result;
 import org.apache.sling.hc.api.execution.HealthCheckExecutionResult;
 import org.apache.sling.hc.api.execution.HealthCheckExecutor;
 import org.apache.sling.hc.util.HealthCheckFilter;
 import org.apache.sling.hc.util.HealthCheckMetadata;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceEvent;
+import org.osgi.framework.ServiceListener;
 import org.osgi.framework.ServiceReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,7 +67,7 @@ import org.slf4j.LoggerFactory;
 @Component(label = "Apache Sling Health Check Executor",
         description = "Runs health checks for a given list of tags in parallel.",
         metatype = true, immediate = true)
-public class HealthCheckExecutorImpl implements ExtendedHealthCheckExecutor {
+public class HealthCheckExecutorImpl implements ExtendedHealthCheckExecutor, ServiceListener {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -93,7 +98,7 @@ public class HealthCheckExecutorImpl imp
 
     private long resultCacheTtlInMs;
 
-    private HealthCheckResultCache healthCheckResultCache = new HealthCheckResultCache();
+    private final HealthCheckResultCache healthCheckResultCache = new HealthCheckResultCache();
 
     private Map<HealthCheckMetadata, HealthCheckFuture> stillRunningFutures = new ConcurrentHashMap<HealthCheckMetadata, HealthCheckFuture>();
 
@@ -112,6 +117,14 @@ public class HealthCheckExecutorImpl imp
         hcThreadPool = threadPoolManager.create(hcThreadPoolConfig, "Health Check Thread Pool");
 
         this.modified(properties);
+
+        try {
+            this.bundleContext.addServiceListener(this, "("
+                    + Constants.OBJECTCLASS + "=" + HealthCheck.class.getName() + ")");
+        } catch (final InvalidSyntaxException ise) {
+            // this should really never happen as the expression above is constant
+            throw new RuntimeException("Unexpected exception occured.", ise);
+        }
     }
 
     @Modified
@@ -137,7 +150,17 @@ public class HealthCheckExecutorImpl imp
     @Deactivate
     protected final void deactivate() {
         threadPoolManager.release(hcThreadPool);
+        this.bundleContext.removeServiceListener(this);
         this.bundleContext = null;
+        this.healthCheckResultCache.clear();
+    }
+
+    @Override
+    public void serviceChanged(final ServiceEvent event) {
+        if ( event.getType() == ServiceEvent.UNREGISTERING ) {
+            final Long serviceId = (Long)event.getServiceReference().getProperty(Constants.SERVICE_ID);
+            this.healthCheckResultCache.removeCachedResult(serviceId);
+        }
     }
 
     /**
@@ -162,11 +185,8 @@ public class HealthCheckExecutorImpl imp
      */
     @Override
     public HealthCheckExecutionResult execute(final ServiceReference ref) {
-        final List<HealthCheckExecutionResult> result = this.execute(new ServiceReference[] {ref});
-        if ( result.size() > 0 ) {
-            return result.get(0);
-        }
-        return null;
+        final HealthCheckMetadata metadata = this.getHealthCheckMetadata(ref);
+        return createResultsForDescriptor(metadata);
     }
 
     /**
@@ -177,7 +197,7 @@ public class HealthCheckExecutorImpl imp
         stopWatch.start();
 
         final List<HealthCheckExecutionResult> results = new ArrayList<HealthCheckExecutionResult>();
-        final List<HealthCheckMetadata> healthCheckDescriptors = getHealthCheckDescriptors(healthCheckReferences);
+        final List<HealthCheckMetadata> healthCheckDescriptors = getHealthCheckMetadata(healthCheckReferences);
 
         createResultsForDescriptors(healthCheckDescriptors, results);
 
@@ -209,20 +229,43 @@ public class HealthCheckExecutorImpl imp
         // everything else is executed in parallel via futures
         List<HealthCheckFuture> futures = createOrReuseFutures(healthCheckDescriptors);
 
-        // wait for futures at most until timeout (but will return earlier if all futures are finsihed)
+        // wait for futures at most until timeout (but will return earlier if all futures are finished)
         waitForFuturesRespectingTimeout(futures);
         collectResultsFromFutures(futures, results);
 
         healthCheckResultCache.updateWith(results);
     }
 
+    private HealthCheckExecutionResult createResultsForDescriptor(final HealthCheckMetadata metadata) {
+        // -- All methods below check if they can transform a healthCheckDescriptor into a result
+        // -- if yes the descriptor is removed from the list and the result added
+
+        // reuse cached results where possible
+        HealthCheckExecutionResult result;
+
+        result = healthCheckResultCache.useValidCacheResults(metadata, resultCacheTtlInMs);
+
+        if ( result == null ) {
+            // everything else is executed in parallel via futures
+            final HealthCheckFuture future = createOrReuseFuture(metadata);
+
+            // wait for futures at most until timeout (but will return earlier if all futures are finished)
+            waitForFuturesRespectingTimeout(future);
+            result = collectResultFromFuture(future);
+        }
+
+        healthCheckResultCache.updateWith(result);
+
+        return result;
+    }
+
     /**
-     * Create the health check descriptors
+     * Create the health check metadata
      */
-    private List<HealthCheckMetadata> getHealthCheckDescriptors(final ServiceReference... healthCheckReferences) {
+    private List<HealthCheckMetadata> getHealthCheckMetadata(final ServiceReference... healthCheckReferences) {
         final List<HealthCheckMetadata> descriptors = new LinkedList<HealthCheckMetadata>();
         for (final ServiceReference serviceReference : healthCheckReferences) {
-            final HealthCheckMetadata descriptor = new HealthCheckMetadata(serviceReference);
+            final HealthCheckMetadata descriptor = getHealthCheckMetadata(serviceReference);
 
             descriptors.add(descriptor);
         }
@@ -230,28 +273,55 @@ public class HealthCheckExecutorImpl imp
         return descriptors;
     }
 
-    private List<HealthCheckFuture> createOrReuseFutures(final List<HealthCheckMetadata> healthCheckDescriptors) {
-        List<HealthCheckFuture> futuresForResultOfThisCall = new LinkedList<HealthCheckFuture>();
+    /**
+     * Create the health check metadata
+     */
+    private HealthCheckMetadata getHealthCheckMetadata(final ServiceReference healthCheckReference) {
+        final HealthCheckMetadata descriptor = new HealthCheckMetadata(healthCheckReference);
+        return descriptor;
+    }
 
-        for (final HealthCheckMetadata healthCheckDescriptor : healthCheckDescriptors) {
+    private List<HealthCheckFuture> createOrReuseFutures(final List<HealthCheckMetadata> healthCheckDescriptors) {
+        final List<HealthCheckFuture> futuresForResultOfThisCall = new LinkedList<HealthCheckFuture>();
 
-            HealthCheckFuture stillRunningFuture = this.stillRunningFutures.get(healthCheckDescriptor);
-            HealthCheckFuture resultFuture;
-            if (stillRunningFuture != null && !stillRunningFuture.isDone()) {
-                logger.debug("Found a future that is still running for {}", healthCheckDescriptor);
-                resultFuture = stillRunningFuture;
-            } else {
-                logger.debug("Creating future for {}", healthCheckDescriptor);
-                resultFuture = new HealthCheckFuture(healthCheckDescriptor, bundleContext);
-                this.hcThreadPool.execute(resultFuture);
-            }
+        for (final HealthCheckMetadata md : healthCheckDescriptors) {
 
-            futuresForResultOfThisCall.add(resultFuture);
+            futuresForResultOfThisCall.add(createOrReuseFuture(md));
 
         }
         return futuresForResultOfThisCall;
     }
 
+    private HealthCheckFuture createOrReuseFuture(final HealthCheckMetadata metadata) {
+        HealthCheckFuture stillRunningFuture = this.stillRunningFutures.get(metadata);
+        HealthCheckFuture resultFuture;
+        if (stillRunningFuture != null && !stillRunningFuture.isDone()) {
+            logger.debug("Found a future that is still running for {}", metadata);
+            resultFuture = stillRunningFuture;
+        } else {
+            logger.debug("Creating future for {}", metadata);
+            resultFuture = new HealthCheckFuture(metadata, bundleContext);
+            this.hcThreadPool.execute(resultFuture);
+        }
+
+        return resultFuture;
+    }
+
+    private void waitForFuturesRespectingTimeout(final HealthCheckFuture healthCheckFuture) {
+        StopWatch callExcutionTimeStopWatch = new StopWatch();
+        callExcutionTimeStopWatch.start();
+        boolean allFuturesDone;
+        do {
+            try {
+                Thread.sleep(50);
+            } catch (InterruptedException ie) {
+                logger.warn("Unexpected InterruptedException while waiting for healthCheckContributors", ie);
+            }
+
+            allFuturesDone = healthCheckFuture.isDone();
+        } while (!allFuturesDone && callExcutionTimeStopWatch.getTime() < this.timeoutInMs);
+    }
+
     private void waitForFuturesRespectingTimeout(List<HealthCheckFuture> futuresForResultOfThisCall) {
         StopWatch callExcutionTimeStopWatch = new StopWatch();
         callExcutionTimeStopWatch.start();
@@ -272,49 +342,13 @@ public class HealthCheckExecutorImpl imp
 
     void collectResultsFromFutures(List<HealthCheckFuture> futuresForResultOfThisCall, Collection<HealthCheckExecutionResult> results) {
 
-        Set<ExecutionResult> resultsFromFutures = new HashSet<ExecutionResult>();
+        Set<HealthCheckExecutionResult> resultsFromFutures = new HashSet<HealthCheckExecutionResult>();
 
         Iterator<HealthCheckFuture> futuresIt = futuresForResultOfThisCall.iterator();
         while (futuresIt.hasNext()) {
-            HealthCheckFuture future = futuresIt.next();
-            ExecutionResult result;
-            if (future.isDone()) {
-                logger.debug("Health Check is done: {}", future.getHealthCheckMetadata());
-
-                try {
-                    result = future.get();
-                } catch (Exception e) {
-                    logger.warn("Unexpected Exception during future.get(): " + e, e);
-                    result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.HEALTH_CHECK_ERROR,
-                            "Unexpected Exception during future.get(): " + e);
-                }
-
-                // if the future came from a previous call remove it from stillRunningFutures
-                if (this.stillRunningFutures.containsKey(future.getHealthCheckMetadata())) {
-                    this.stillRunningFutures.remove(future.getHealthCheckMetadata());
-                }
+            final HealthCheckFuture future = futuresIt.next();
+            final HealthCheckExecutionResult result = this.collectResultFromFuture(future);
 
-            } else {
-                logger.debug("Health Check timed out: {}", future.getHealthCheckMetadata());
-                // Futures must not be cancelled as interrupting a health check might could cause a corrupted repository index
-                // (CrxRoundtripCheck) or ugly messages/stack traces in the log file
-
-                this.stillRunningFutures.put(future.getHealthCheckMetadata(), future);
-
-                // normally we turn the check into WARN (normal timeout), but if the threshold time for CRITICAL is reached for a certain
-                // future we turn the result CRITICAL
-                long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime();
-                if (futureElapsedTimeMs < this.longRunningFutureThresholdForRedMs) {
-                    result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.WARN,
-                            "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs), futureElapsedTimeMs);
-
-                } else {
-                    result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.CRITICAL,
-                            "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs)
-                                    + " (exceeding the configured threshold for CRITICAL: "
-                                    + msHumanReadable(this.longRunningFutureThresholdForRedMs) + ")", futureElapsedTimeMs);
-                }
-            }
             resultsFromFutures.add(result);
             futuresIt.remove();
         }
@@ -323,6 +357,49 @@ public class HealthCheckExecutorImpl imp
         results.addAll(resultsFromFutures);
     }
 
+    HealthCheckExecutionResult collectResultFromFuture(final HealthCheckFuture future) {
+
+        HealthCheckExecutionResult result;
+        if (future.isDone()) {
+            logger.debug("Health Check is done: {}", future.getHealthCheckMetadata());
+
+            try {
+                result = future.get();
+            } catch (Exception e) {
+                logger.warn("Unexpected Exception during future.get(): " + e, e);
+                result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.HEALTH_CHECK_ERROR,
+                        "Unexpected Exception during future.get(): " + e);
+            }
+
+            // if the future came from a previous call remove it from stillRunningFutures
+            if (this.stillRunningFutures.containsKey(future.getHealthCheckMetadata())) {
+                this.stillRunningFutures.remove(future.getHealthCheckMetadata());
+            }
+
+        } else {
+            logger.debug("Health Check timed out: {}", future.getHealthCheckMetadata());
+            // Futures must not be cancelled as interrupting a health check might could cause a corrupted repository index
+            // (CrxRoundtripCheck) or ugly messages/stack traces in the log file
+
+            this.stillRunningFutures.put(future.getHealthCheckMetadata(), future);
+
+            // normally we turn the check into WARN (normal timeout), but if the threshold time for CRITICAL is reached for a certain
+            // future we turn the result CRITICAL
+            long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime();
+            if (futureElapsedTimeMs < this.longRunningFutureThresholdForRedMs) {
+                result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.WARN,
+                        "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs), futureElapsedTimeMs);
+
+            } else {
+                result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.CRITICAL,
+                        "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs)
+                                + " (exceeding the configured threshold for CRITICAL: "
+                                + msHumanReadable(this.longRunningFutureThresholdForRedMs) + ")", futureElapsedTimeMs);
+            }
+        }
+        return result;
+    }
+
     static String msHumanReadable(final long millis) {
 
         double number = millis;

Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java?rev=1557667&r1=1557666&r2=1557667&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java (original)
+++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java Mon Jan 13 09:01:27 2014
@@ -33,36 +33,47 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Caches health check results.
- *
  */
 public class HealthCheckResultCache {
 
+    /**
+     * The logger.
+     */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
+    /**
+     * The map holding the cached results.
+     */
     private final Map<Long, HealthCheckExecutionResult> cache = new ConcurrentHashMap<Long, HealthCheckExecutionResult>();
 
-    @Override
-    public String toString() {
-        return "[HealthCheckResultCache size=" + cache.size() + "]";
-    }
-
+    /**
+     * Update the cache with the results
+     */
     public void updateWith(final Collection<HealthCheckExecutionResult> results) {
         for (final HealthCheckExecutionResult result : results) {
-            final ExecutionResult executionResult = (ExecutionResult) result;
-            cache.put(executionResult.getServiceId(), result);
+            this.updateWith(result);
         }
     }
 
+    /**
+     * Update the cache with the result
+     */
+    public void updateWith(HealthCheckExecutionResult result) {
+        final ExecutionResult executionResult = (ExecutionResult) result;
+        cache.put(executionResult.getServiceId(), result);
+    }
+
+    /**
+     * Get the valid cache results
+     */
     public void useValidCacheResults(final List<HealthCheckMetadata> metadatas,
             final Collection<HealthCheckExecutionResult> results,
             final long resultCacheTtlInMs) {
-
-
         final Set<HealthCheckExecutionResult> cachedResults = new TreeSet<HealthCheckExecutionResult>();
         final Iterator<HealthCheckMetadata> checksIt = metadatas.iterator();
         while (checksIt.hasNext()) {
-            final HealthCheckMetadata descriptor = checksIt.next();
-            final HealthCheckExecutionResult result = get(descriptor, resultCacheTtlInMs);
+            final HealthCheckMetadata md = checksIt.next();
+            final HealthCheckExecutionResult result = useValidCacheResults(md, resultCacheTtlInMs);
             if (result != null) {
                 cachedResults.add(result);
                 checksIt.remove();
@@ -72,6 +83,14 @@ public class HealthCheckResultCache {
         results.addAll(cachedResults);
     }
 
+    /**
+     * Return the cached result if it's still valid.
+     */
+    public HealthCheckExecutionResult useValidCacheResults(final HealthCheckMetadata metadata,
+            final long resultCacheTtlInMs) {
+        return get(metadata, resultCacheTtlInMs);
+    }
+
     private HealthCheckExecutionResult get(final HealthCheckMetadata metadata, final long resultCacheTtlInMs) {
         final Long key = metadata.getServiceId();
         final HealthCheckExecutionResult cachedResult = cache.get(key);
@@ -99,4 +118,23 @@ public class HealthCheckResultCache {
         return null;
     }
 
+    /**
+     * Clear the whole cache
+     */
+    public void clear() {
+        this.cache.clear();
+    }
+
+    /**
+     * Remove entry from cache
+     */
+    public void removeCachedResult(final Long serviceId) {
+        this.cache.remove(serviceId);
+    }
+
+    @Override
+    public String toString() {
+        return "[HealthCheckResultCache size=" + cache.size() + "]";
+    }
+
 }