You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/01/13 10:01:27 UTC
svn commit: r1557667 - in
/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor:
HealthCheckExecutorImpl.java HealthCheckResultCache.java
Author: cziegeler
Date: Mon Jan 13 09:01:27 2014
New Revision: 1557667
URL: http://svn.apache.org/r1557667
Log:
SLING-3278 : Provide a HealthCheckExecutor service. Simplify code for single reference execution, remove cached entry if service is unregistered
Modified:
sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java
sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java
Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java?rev=1557667&r1=1557666&r2=1557667&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java (original)
+++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckExecutorImpl.java Mon Jan 13 09:01:27 2014
@@ -44,12 +44,17 @@ import org.apache.sling.commons.osgi.Pro
import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.hc.api.HealthCheck;
import org.apache.sling.hc.api.Result;
import org.apache.sling.hc.api.execution.HealthCheckExecutionResult;
import org.apache.sling.hc.api.execution.HealthCheckExecutor;
import org.apache.sling.hc.util.HealthCheckFilter;
import org.apache.sling.hc.util.HealthCheckMetadata;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceEvent;
+import org.osgi.framework.ServiceListener;
import org.osgi.framework.ServiceReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +67,7 @@ import org.slf4j.LoggerFactory;
@Component(label = "Apache Sling Health Check Executor",
description = "Runs health checks for a given list of tags in parallel.",
metatype = true, immediate = true)
-public class HealthCheckExecutorImpl implements ExtendedHealthCheckExecutor {
+public class HealthCheckExecutorImpl implements ExtendedHealthCheckExecutor, ServiceListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -93,7 +98,7 @@ public class HealthCheckExecutorImpl imp
private long resultCacheTtlInMs;
- private HealthCheckResultCache healthCheckResultCache = new HealthCheckResultCache();
+ private final HealthCheckResultCache healthCheckResultCache = new HealthCheckResultCache();
private Map<HealthCheckMetadata, HealthCheckFuture> stillRunningFutures = new ConcurrentHashMap<HealthCheckMetadata, HealthCheckFuture>();
@@ -112,6 +117,14 @@ public class HealthCheckExecutorImpl imp
hcThreadPool = threadPoolManager.create(hcThreadPoolConfig, "Health Check Thread Pool");
this.modified(properties);
+
+ try {
+ this.bundleContext.addServiceListener(this, "("
+ + Constants.OBJECTCLASS + "=" + HealthCheck.class.getName() + ")");
+ } catch (final InvalidSyntaxException ise) {
+ // this should really never happen as the expression above is constant
+ throw new RuntimeException("Unexpected exception occured.", ise);
+ }
}
@Modified
@@ -137,7 +150,17 @@ public class HealthCheckExecutorImpl imp
@Deactivate
protected final void deactivate() {
threadPoolManager.release(hcThreadPool);
+ this.bundleContext.removeServiceListener(this);
this.bundleContext = null;
+ this.healthCheckResultCache.clear();
+ }
+
+ @Override
+ public void serviceChanged(final ServiceEvent event) {
+ if ( event.getType() == ServiceEvent.UNREGISTERING ) {
+ final Long serviceId = (Long)event.getServiceReference().getProperty(Constants.SERVICE_ID);
+ this.healthCheckResultCache.removeCachedResult(serviceId);
+ }
}
/**
@@ -162,11 +185,8 @@ public class HealthCheckExecutorImpl imp
*/
@Override
public HealthCheckExecutionResult execute(final ServiceReference ref) {
- final List<HealthCheckExecutionResult> result = this.execute(new ServiceReference[] {ref});
- if ( result.size() > 0 ) {
- return result.get(0);
- }
- return null;
+ final HealthCheckMetadata metadata = this.getHealthCheckMetadata(ref);
+ return createResultsForDescriptor(metadata);
}
/**
@@ -177,7 +197,7 @@ public class HealthCheckExecutorImpl imp
stopWatch.start();
final List<HealthCheckExecutionResult> results = new ArrayList<HealthCheckExecutionResult>();
- final List<HealthCheckMetadata> healthCheckDescriptors = getHealthCheckDescriptors(healthCheckReferences);
+ final List<HealthCheckMetadata> healthCheckDescriptors = getHealthCheckMetadata(healthCheckReferences);
createResultsForDescriptors(healthCheckDescriptors, results);
@@ -209,20 +229,43 @@ public class HealthCheckExecutorImpl imp
// everything else is executed in parallel via futures
List<HealthCheckFuture> futures = createOrReuseFutures(healthCheckDescriptors);
- // wait for futures at most until timeout (but will return earlier if all futures are finsihed)
+ // wait for futures at most until timeout (but will return earlier if all futures are finished)
waitForFuturesRespectingTimeout(futures);
collectResultsFromFutures(futures, results);
healthCheckResultCache.updateWith(results);
}
+ private HealthCheckExecutionResult createResultsForDescriptor(final HealthCheckMetadata metadata) {
+ // -- All methods below check if they can transform a healthCheckDescriptor into a result
+ // -- if yes the descriptor is removed from the list and the result added
+
+ // reuse cached results where possible
+ HealthCheckExecutionResult result;
+
+ result = healthCheckResultCache.useValidCacheResults(metadata, resultCacheTtlInMs);
+
+ if ( result == null ) {
+ // everything else is executed in parallel via futures
+ final HealthCheckFuture future = createOrReuseFuture(metadata);
+
+ // wait for futures at most until timeout (but will return earlier if all futures are finished)
+ waitForFuturesRespectingTimeout(future);
+ result = collectResultFromFuture(future);
+ }
+
+ healthCheckResultCache.updateWith(result);
+
+ return result;
+ }
+
/**
- * Create the health check descriptors
+ * Create the health check metadata
*/
- private List<HealthCheckMetadata> getHealthCheckDescriptors(final ServiceReference... healthCheckReferences) {
+ private List<HealthCheckMetadata> getHealthCheckMetadata(final ServiceReference... healthCheckReferences) {
final List<HealthCheckMetadata> descriptors = new LinkedList<HealthCheckMetadata>();
for (final ServiceReference serviceReference : healthCheckReferences) {
- final HealthCheckMetadata descriptor = new HealthCheckMetadata(serviceReference);
+ final HealthCheckMetadata descriptor = getHealthCheckMetadata(serviceReference);
descriptors.add(descriptor);
}
@@ -230,28 +273,55 @@ public class HealthCheckExecutorImpl imp
return descriptors;
}
- private List<HealthCheckFuture> createOrReuseFutures(final List<HealthCheckMetadata> healthCheckDescriptors) {
- List<HealthCheckFuture> futuresForResultOfThisCall = new LinkedList<HealthCheckFuture>();
+ /**
+ * Create the health check metadata
+ */
+ private HealthCheckMetadata getHealthCheckMetadata(final ServiceReference healthCheckReference) {
+ final HealthCheckMetadata descriptor = new HealthCheckMetadata(healthCheckReference);
+ return descriptor;
+ }
- for (final HealthCheckMetadata healthCheckDescriptor : healthCheckDescriptors) {
+ private List<HealthCheckFuture> createOrReuseFutures(final List<HealthCheckMetadata> healthCheckDescriptors) {
+ final List<HealthCheckFuture> futuresForResultOfThisCall = new LinkedList<HealthCheckFuture>();
- HealthCheckFuture stillRunningFuture = this.stillRunningFutures.get(healthCheckDescriptor);
- HealthCheckFuture resultFuture;
- if (stillRunningFuture != null && !stillRunningFuture.isDone()) {
- logger.debug("Found a future that is still running for {}", healthCheckDescriptor);
- resultFuture = stillRunningFuture;
- } else {
- logger.debug("Creating future for {}", healthCheckDescriptor);
- resultFuture = new HealthCheckFuture(healthCheckDescriptor, bundleContext);
- this.hcThreadPool.execute(resultFuture);
- }
+ for (final HealthCheckMetadata md : healthCheckDescriptors) {
- futuresForResultOfThisCall.add(resultFuture);
+ futuresForResultOfThisCall.add(createOrReuseFuture(md));
}
return futuresForResultOfThisCall;
}
+ private HealthCheckFuture createOrReuseFuture(final HealthCheckMetadata metadata) {
+ HealthCheckFuture stillRunningFuture = this.stillRunningFutures.get(metadata);
+ HealthCheckFuture resultFuture;
+ if (stillRunningFuture != null && !stillRunningFuture.isDone()) {
+ logger.debug("Found a future that is still running for {}", metadata);
+ resultFuture = stillRunningFuture;
+ } else {
+ logger.debug("Creating future for {}", metadata);
+ resultFuture = new HealthCheckFuture(metadata, bundleContext);
+ this.hcThreadPool.execute(resultFuture);
+ }
+
+ return resultFuture;
+ }
+
+ private void waitForFuturesRespectingTimeout(final HealthCheckFuture healthCheckFuture) {
+ StopWatch callExcutionTimeStopWatch = new StopWatch();
+ callExcutionTimeStopWatch.start();
+ boolean allFuturesDone;
+ do {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException ie) {
+ logger.warn("Unexpected InterruptedException while waiting for healthCheckContributors", ie);
+ }
+
+ allFuturesDone = healthCheckFuture.isDone();
+ } while (!allFuturesDone && callExcutionTimeStopWatch.getTime() < this.timeoutInMs);
+ }
+
private void waitForFuturesRespectingTimeout(List<HealthCheckFuture> futuresForResultOfThisCall) {
StopWatch callExcutionTimeStopWatch = new StopWatch();
callExcutionTimeStopWatch.start();
@@ -272,49 +342,13 @@ public class HealthCheckExecutorImpl imp
void collectResultsFromFutures(List<HealthCheckFuture> futuresForResultOfThisCall, Collection<HealthCheckExecutionResult> results) {
- Set<ExecutionResult> resultsFromFutures = new HashSet<ExecutionResult>();
+ Set<HealthCheckExecutionResult> resultsFromFutures = new HashSet<HealthCheckExecutionResult>();
Iterator<HealthCheckFuture> futuresIt = futuresForResultOfThisCall.iterator();
while (futuresIt.hasNext()) {
- HealthCheckFuture future = futuresIt.next();
- ExecutionResult result;
- if (future.isDone()) {
- logger.debug("Health Check is done: {}", future.getHealthCheckMetadata());
-
- try {
- result = future.get();
- } catch (Exception e) {
- logger.warn("Unexpected Exception during future.get(): " + e, e);
- result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.HEALTH_CHECK_ERROR,
- "Unexpected Exception during future.get(): " + e);
- }
-
- // if the future came from a previous call remove it from stillRunningFutures
- if (this.stillRunningFutures.containsKey(future.getHealthCheckMetadata())) {
- this.stillRunningFutures.remove(future.getHealthCheckMetadata());
- }
+ final HealthCheckFuture future = futuresIt.next();
+ final HealthCheckExecutionResult result = this.collectResultFromFuture(future);
- } else {
- logger.debug("Health Check timed out: {}", future.getHealthCheckMetadata());
- // Futures must not be cancelled as interrupting a health check might could cause a corrupted repository index
- // (CrxRoundtripCheck) or ugly messages/stack traces in the log file
-
- this.stillRunningFutures.put(future.getHealthCheckMetadata(), future);
-
- // normally we turn the check into WARN (normal timeout), but if the threshold time for CRITICAL is reached for a certain
- // future we turn the result CRITICAL
- long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime();
- if (futureElapsedTimeMs < this.longRunningFutureThresholdForRedMs) {
- result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.WARN,
- "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs), futureElapsedTimeMs);
-
- } else {
- result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.CRITICAL,
- "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs)
- + " (exceeding the configured threshold for CRITICAL: "
- + msHumanReadable(this.longRunningFutureThresholdForRedMs) + ")", futureElapsedTimeMs);
- }
- }
resultsFromFutures.add(result);
futuresIt.remove();
}
@@ -323,6 +357,49 @@ public class HealthCheckExecutorImpl imp
results.addAll(resultsFromFutures);
}
+ HealthCheckExecutionResult collectResultFromFuture(final HealthCheckFuture future) {
+
+ HealthCheckExecutionResult result;
+ if (future.isDone()) {
+ logger.debug("Health Check is done: {}", future.getHealthCheckMetadata());
+
+ try {
+ result = future.get();
+ } catch (Exception e) {
+ logger.warn("Unexpected Exception during future.get(): " + e, e);
+ result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.HEALTH_CHECK_ERROR,
+ "Unexpected Exception during future.get(): " + e);
+ }
+
+ // if the future came from a previous call remove it from stillRunningFutures
+ if (this.stillRunningFutures.containsKey(future.getHealthCheckMetadata())) {
+ this.stillRunningFutures.remove(future.getHealthCheckMetadata());
+ }
+
+ } else {
+ logger.debug("Health Check timed out: {}", future.getHealthCheckMetadata());
+ // Futures must not be cancelled as interrupting a health check might could cause a corrupted repository index
+ // (CrxRoundtripCheck) or ugly messages/stack traces in the log file
+
+ this.stillRunningFutures.put(future.getHealthCheckMetadata(), future);
+
+ // normally we turn the check into WARN (normal timeout), but if the threshold time for CRITICAL is reached for a certain
+ // future we turn the result CRITICAL
+ long futureElapsedTimeMs = new Date().getTime() - future.getCreatedTime().getTime();
+ if (futureElapsedTimeMs < this.longRunningFutureThresholdForRedMs) {
+ result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.WARN,
+ "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs), futureElapsedTimeMs);
+
+ } else {
+ result = new ExecutionResult(future.getHealthCheckMetadata(), Result.Status.CRITICAL,
+ "Timeout: Check still running after " + msHumanReadable(futureElapsedTimeMs)
+ + " (exceeding the configured threshold for CRITICAL: "
+ + msHumanReadable(this.longRunningFutureThresholdForRedMs) + ")", futureElapsedTimeMs);
+ }
+ }
+ return result;
+ }
+
static String msHumanReadable(final long millis) {
double number = millis;
Modified: sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java?rev=1557667&r1=1557666&r2=1557667&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java (original)
+++ sling/trunk/bundles/extensions/healthcheck/core/src/main/java/org/apache/sling/hc/core/impl/executor/HealthCheckResultCache.java Mon Jan 13 09:01:27 2014
@@ -33,36 +33,47 @@ import org.slf4j.LoggerFactory;
/**
* Caches health check results.
- *
*/
public class HealthCheckResultCache {
+ /**
+ * The logger.
+ */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ /**
+ * The map holding the cached results.
+ */
private final Map<Long, HealthCheckExecutionResult> cache = new ConcurrentHashMap<Long, HealthCheckExecutionResult>();
- @Override
- public String toString() {
- return "[HealthCheckResultCache size=" + cache.size() + "]";
- }
-
+ /**
+ * Update the cache with the results
+ */
public void updateWith(final Collection<HealthCheckExecutionResult> results) {
for (final HealthCheckExecutionResult result : results) {
- final ExecutionResult executionResult = (ExecutionResult) result;
- cache.put(executionResult.getServiceId(), result);
+ this.updateWith(result);
}
}
+ /**
+ * Update the cache with the result
+ */
+ public void updateWith(HealthCheckExecutionResult result) {
+ final ExecutionResult executionResult = (ExecutionResult) result;
+ cache.put(executionResult.getServiceId(), result);
+ }
+
+ /**
+ * Get the valid cache results
+ */
public void useValidCacheResults(final List<HealthCheckMetadata> metadatas,
final Collection<HealthCheckExecutionResult> results,
final long resultCacheTtlInMs) {
-
-
final Set<HealthCheckExecutionResult> cachedResults = new TreeSet<HealthCheckExecutionResult>();
final Iterator<HealthCheckMetadata> checksIt = metadatas.iterator();
while (checksIt.hasNext()) {
- final HealthCheckMetadata descriptor = checksIt.next();
- final HealthCheckExecutionResult result = get(descriptor, resultCacheTtlInMs);
+ final HealthCheckMetadata md = checksIt.next();
+ final HealthCheckExecutionResult result = useValidCacheResults(md, resultCacheTtlInMs);
if (result != null) {
cachedResults.add(result);
checksIt.remove();
@@ -72,6 +83,14 @@ public class HealthCheckResultCache {
results.addAll(cachedResults);
}
+ /**
+ * Return the cached result if it's still valid.
+ */
+ public HealthCheckExecutionResult useValidCacheResults(final HealthCheckMetadata metadata,
+ final long resultCacheTtlInMs) {
+ return get(metadata, resultCacheTtlInMs);
+ }
+
private HealthCheckExecutionResult get(final HealthCheckMetadata metadata, final long resultCacheTtlInMs) {
final Long key = metadata.getServiceId();
final HealthCheckExecutionResult cachedResult = cache.get(key);
@@ -99,4 +118,23 @@ public class HealthCheckResultCache {
return null;
}
+ /**
+ * Clear the whole cache
+ */
+ public void clear() {
+ this.cache.clear();
+ }
+
+ /**
+ * Remove entry from cache
+ */
+ public void removeCachedResult(final Long serviceId) {
+ this.cache.remove(serviceId);
+ }
+
+ @Override
+ public String toString() {
+ return "[HealthCheckResultCache size=" + cache.size() + "]";
+ }
+
}