You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/04/07 12:25:39 UTC
svn commit: r1671790 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
config/ tasks/
Author: cziegeler
Date: Tue Apr 7 10:25:38 2015
New Revision: 1671790
URL: http://svn.apache.org/r1671790
Log:
SLING-4582 : Maintenance tasks should guard against NPE
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1671790&r1=1671789&r2=1671790&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Tue Apr 7 10:25:38 2015
@@ -243,16 +243,19 @@ public class JobManagerConfiguration imp
/**
* Create a new resource resolver for reading and writing the resource tree.
* The resolver needs to be closed by the client.
- * @return A resource resolver
+ * @return A resource resolver or {@code null} if the component is already deactivated.
* @throws RuntimeException if the resolver can't be created.
*/
public ResourceResolver createResourceResolver() {
ResourceResolver resolver = null;
- try {
- resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
- } catch ( final LoginException le) {
- logger.error("Unable to create new resource resolver: " + le.getMessage(), le);
- throw new RuntimeException(le);
+ final ResourceResolverFactory factory = this.resourceResolverFactory;
+ if ( factory != null ) {
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ } catch ( final LoginException le) {
+ logger.error("Unable to create new resource resolver: " + le.getMessage(), le);
+ throw new RuntimeException(le);
+ }
}
return resolver;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?rev=1671790&r1=1671789&r2=1671790&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Tue Apr 7 10:25:38 2015
@@ -32,6 +32,7 @@ import org.apache.sling.discovery.Instan
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.support.ResourceHelper;
@@ -74,25 +75,27 @@ public class CheckTopologyTask {
if ( caps.isLeader() && caps.isActive() ) {
this.logger.debug("Checking for stopped instances...");
final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath());
- this.logger.debug("Got jobs root {}", jobsRoot);
-
- // this resource should exist, but we check anyway
- if ( jobsRoot != null ) {
- final Iterator<Resource> instanceIter = jobsRoot.listChildren();
- while ( caps.isActive() && instanceIter.hasNext() ) {
- final Resource instanceResource = instanceIter.next();
-
- final String instanceId = instanceResource.getName();
- if ( !caps.isActive(instanceId) ) {
- logger.debug("Found stopped instance {}", instanceId);
- assignJobs(instanceResource, true);
+ if ( resolver != null ) {
+ try {
+ final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath());
+ this.logger.debug("Got jobs root {}", jobsRoot);
+
+ // this resource should exist, but we check anyway
+ if ( jobsRoot != null ) {
+ final Iterator<Resource> instanceIter = jobsRoot.listChildren();
+ while ( caps.isActive() && instanceIter.hasNext() ) {
+ final Resource instanceResource = instanceIter.next();
+
+ final String instanceId = instanceResource.getName();
+ if ( !caps.isActive(instanceId) ) {
+ logger.debug("Found stopped instance {}", instanceId);
+ assignJobs(instanceResource, true);
+ }
}
}
+ } finally {
+ resolver.close();
}
- } finally {
- resolver.close();
}
}
}
@@ -104,85 +107,91 @@ public class CheckTopologyTask {
if ( caps.isActive() ) {
this.logger.debug("Checking for stale jobs...");
final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobsRoot = resolver.getResource(this.configuration.getLocalJobsPath());
-
- // this resource should exist, but we check anyway
- if ( jobsRoot != null ) {
- // check if this instance supports bridged jobs
- final List<InstanceDescription> bridgedTargets = caps.getPotentialTargets("/", null);
- boolean flag = false;
- for(final InstanceDescription desc : bridgedTargets) {
- if ( desc.isLocal() ) {
- flag = true;
- break;
- }
- }
- final boolean supportsBridged = flag;
-
- final Iterator<Resource> topicIter = jobsRoot.listChildren();
- while ( caps.isActive() && topicIter.hasNext() ) {
- final Resource topicResource = topicIter.next();
-
- final String topicName = topicResource.getName().replace('.', '/');
- this.logger.debug("Checking topic {}..." , topicName);
- final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, null);
- boolean reassign = true;
- for(final InstanceDescription desc : potentialTargets) {
+ if ( resolver != null ) {
+ try {
+ final Resource jobsRoot = resolver.getResource(this.configuration.getLocalJobsPath());
+
+ // this resource should exist, but we check anyway
+ if ( jobsRoot != null ) {
+ // check if this instance supports bridged jobs
+ final List<InstanceDescription> bridgedTargets = caps.getPotentialTargets("/", null);
+ boolean flag = false;
+ for(final InstanceDescription desc : bridgedTargets) {
if ( desc.isLocal() ) {
- reassign = false;
+ flag = true;
break;
}
}
- if ( reassign ) {
- final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(topicName);
- JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
-
- @Override
- public boolean handle(final Resource rsrc) {
- try {
- final ValueMap vm = ResourceHelper.getValueMap(rsrc);
- if ( !supportsBridged || vm.get(JobImpl.PROPERTY_BRIDGED_EVENT) == null ) {
- final String targetId = caps.detectTarget(topicName, vm, info);
-
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
- final String newPath;
- if ( targetId != null ) {
- newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
- props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
- props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
- } else {
- newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
- props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
- props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
- }
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(rsrc);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
- resolver.refresh();
- resolver.revert();
+ final boolean supportsBridged = flag;
+
+ final Iterator<Resource> topicIter = jobsRoot.listChildren();
+ while ( caps.isActive() && topicIter.hasNext() ) {
+ final Resource topicResource = topicIter.next();
+
+ final String topicName = topicResource.getName().replace('.', '/');
+ this.logger.debug("Checking topic {}..." , topicName);
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, null);
+ boolean reassign = true;
+ for(final InstanceDescription desc : potentialTargets) {
+ if ( desc.isLocal() ) {
+ reassign = false;
+ break;
+ }
+ }
+ if ( reassign ) {
+ final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager();
+ if ( qcm == null ) {
+ break;
+ }
+ final QueueInfo info = qcm.getQueueInfo(topicName);
+ JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
+
+ @Override
+ public boolean handle(final Resource rsrc) {
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+ if ( !supportsBridged || vm.get(JobImpl.PROPERTY_BRIDGED_EVENT) == null ) {
+ final String targetId = caps.detectTarget(topicName, vm, info);
+
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ final String newPath;
+ if ( targetId != null ) {
+ newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ } else {
+ newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ }
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
+ resolver.refresh();
+ resolver.revert();
+ }
}
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ logger.warn("Unable to move stale job from " + rsrc.getPath(), ie);
+ resolver.refresh();
+ resolver.revert();
}
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- logger.warn("Unable to move stale job from " + rsrc.getPath(), ie);
- resolver.refresh();
- resolver.revert();
+ return caps.isActive();
}
- return caps.isActive();
- }
- });
+ });
+ }
}
}
+ } finally {
+ resolver.close();
}
- } finally {
- resolver.close();
}
}
}
@@ -197,16 +206,18 @@ public class CheckTopologyTask {
if ( caps.isLeader() && caps.isActive() ) {
logger.debug("Checking unassigned jobs...");
final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath());
- logger.debug("Got unassigned root {}", unassignedRoot);
-
- // this resource should exist, but we check anyway
- if ( unassignedRoot != null ) {
- assignJobs(unassignedRoot, false);
+ if ( resolver != null ) {
+ try {
+ final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath());
+ logger.debug("Got unassigned root {}", unassignedRoot);
+
+ // this resource should exist, but we check anyway
+ if ( unassignedRoot != null ) {
+ assignJobs(unassignedRoot, false);
+ }
+ } finally {
+ resolver.close();
}
- } finally {
- resolver.close();
}
}
}
@@ -234,7 +245,11 @@ public class CheckTopologyTask {
// first check if there is an instance for these topics
final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, BRIDGED_JOB);
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
- final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(topicName);
+ final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager();
+ if ( qcm == null ) {
+ break;
+ }
+ final QueueInfo info = qcm.getQueueInfo(topicName);
logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java?rev=1671790&r1=1671789&r2=1671790&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java Tue Apr 7 10:25:38 2015
@@ -102,40 +102,42 @@ public class CleanUpTask {
this.logger.debug("Cleaning up job resource tree: removing obsolete locks");
final List<Resource> candidates = new ArrayList<Resource>();
final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource parentResource = resolver.getResource(this.configuration.getLocksPath());
- final Calendar startDate = Calendar.getInstance();
- startDate.add(Calendar.MINUTE, -2);
-
- this.lockCleanup(caps, candidates, parentResource, startDate);
- final BatchResourceRemover remover = new BatchResourceRemover();
- boolean batchRemove = true;
- for(final Resource lockResource : candidates) {
- if ( caps.isActive() ) {
- try {
- if ( batchRemove ) {
- remover.delete(lockResource);
- } else {
- resolver.delete(lockResource);
- resolver.commit();
+ if ( resolver != null ) {
+ try {
+ final Resource parentResource = resolver.getResource(this.configuration.getLocksPath());
+ final Calendar startDate = Calendar.getInstance();
+ startDate.add(Calendar.MINUTE, -2);
+
+ this.lockCleanup(caps, candidates, parentResource, startDate);
+ final BatchResourceRemover remover = new BatchResourceRemover();
+ boolean batchRemove = true;
+ for(final Resource lockResource : candidates) {
+ if ( caps.isActive() ) {
+ try {
+ if ( batchRemove ) {
+ remover.delete(lockResource);
+ } else {
+ resolver.delete(lockResource);
+ resolver.commit();
+ }
+ } catch ( final PersistenceException pe) {
+ batchRemove = false;
+ this.ignoreException(pe);
+ resolver.refresh();
}
- } catch ( final PersistenceException pe) {
- batchRemove = false;
- this.ignoreException(pe);
- resolver.refresh();
+ } else {
+ break;
}
- } else {
- break;
}
+ try {
+ resolver.commit();
+ } catch ( final PersistenceException pe) {
+ this.ignoreException(pe);
+ resolver.refresh();
+ }
+ } finally {
+ resolver.close();
}
- try {
- resolver.commit();
- } catch ( final PersistenceException pe) {
- this.ignoreException(pe);
- resolver.refresh();
- }
- } finally {
- resolver.close();
}
}
}
@@ -235,6 +237,9 @@ public class CleanUpTask {
private void fullEmptyFolderCleanup(final TopologyCapabilities caps, final String basePath) {
this.logger.debug("Cleaning up job resource tree: removing ALL empty folders");
final ResourceResolver resolver = this.configuration.createResourceResolver();
+ if ( resolver == null ) {
+ return;
+ }
try {
final Resource baseResource = resolver.getResource(basePath);
// sanity check - should never be null
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java?rev=1671790&r1=1671789&r2=1671790&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java Tue Apr 7 10:25:38 2015
@@ -61,6 +61,9 @@ public class FindUnfinishedJobsTask {
private void initialScan() {
logger.debug("Scanning repository for unfinished jobs...");
final ResourceResolver resolver = configuration.createResourceResolver();
+ if ( resolver == null ) {
+ return;
+ }
try {
final Resource baseResource = resolver.getResource(configuration.getLocalJobsPath());
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java?rev=1671790&r1=1671789&r2=1671790&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java Tue Apr 7 10:25:38 2015
@@ -32,6 +32,7 @@ import org.apache.sling.discovery.Instan
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
import org.apache.sling.event.impl.support.Environment;
@@ -83,19 +84,21 @@ public class UpgradeTask {
private void upgradeBridgedJobs() {
final String path = configuration.getLocalJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT;
final ResourceResolver resolver = configuration.createResourceResolver();
- try {
- final Resource rootResource = resolver.getResource(path);
- if ( rootResource != null ) {
- upgradeBridgedJobs(rootResource);
- }
- if ( caps.isLeader() ) {
- final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT);
- if ( unassignedRoot != null ) {
- upgradeBridgedJobs(unassignedRoot);
+ if ( resolver != null ) {
+ try {
+ final Resource rootResource = resolver.getResource(path);
+ if ( rootResource != null ) {
+ upgradeBridgedJobs(rootResource);
+ }
+ if ( caps.isLeader() ) {
+ final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT);
+ if ( unassignedRoot != null ) {
+ upgradeBridgedJobs(unassignedRoot);
+ }
}
+ } finally {
+ resolver.close();
}
- } finally {
- resolver.close();
}
}
@@ -105,7 +108,11 @@ public class UpgradeTask {
*/
private void upgradeBridgedJobs(final Resource topicResource) {
final String topicName = topicResource.getName().replace('.', '/');
- final QueueInfo info = configuration.getQueueConfigurationManager().getQueueInfo(topicName);
+ final QueueConfigurationManager qcm = configuration.getQueueConfigurationManager();
+ if ( qcm == null ) {
+ return;
+ }
+ final QueueInfo info = qcm.getQueueInfo(topicName);
JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.ResourceCallback() {
@Override
@@ -150,13 +157,15 @@ public class UpgradeTask {
*/
private void processJobsFromPreviousVersions() {
final ResourceResolver resolver = configuration.createResourceResolver();
- try {
- this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionAnonPath()));
- this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
- } catch ( final PersistenceException pe ) {
- this.logger.warn("Problems moving jobs from previous version.", pe);
- } finally {
- resolver.close();
+ if ( resolver != null ) {
+ try {
+ this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionAnonPath()));
+ this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
+ } catch ( final PersistenceException pe ) {
+ this.logger.warn("Problems moving jobs from previous version.", pe);
+ } finally {
+ resolver.close();
+ }
}
}
@@ -237,7 +246,12 @@ public class UpgradeTask {
final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
String targetId = null;
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
- final QueueInfo info = configuration.getQueueConfigurationManager().getQueueInfo(topic);
+ final QueueConfigurationManager qcm = configuration.getQueueConfigurationManager();
+ if ( qcm == null ) {
+ resolver.revert();
+ return;
+ }
+ final QueueInfo info = qcm.getQueueInfo(topic);
logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
targetId = caps.detectTarget(topic, vm, info);
if ( targetId != null ) {