You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/15 13:30:10 UTC
svn commit: r1631994 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl:
jobs/ jobs/config/ jobs/topics/ topology/
Author: cziegeler
Date: Wed Oct 15 11:30:09 2014
New Revision: 1631994
URL: http://svn.apache.org/r1631994
Log:
SLING-4048 : Avoid keeping jobs in memory. Move topology handling to own handler service
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java (with props)
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Oct 15 11:30:09 2014
@@ -117,10 +117,6 @@ public class JobManagerImpl
@Reference
private EventAdmin eventAdmin;
- /** The configuration manager. */
- @Reference
- private QueueConfigurationManager queueConfigManager;
-
@Reference
private Scheduler scheduler;
@@ -137,6 +133,8 @@ public class JobManagerImpl
@Reference
private JobManagerConfiguration configuration;
+ @Reference
+ private QueueConfigurationManager queueManager;
private volatile TopologyCapabilities topologyCapabilities;
@@ -248,7 +246,7 @@ public class JobManagerImpl
// invoke maintenance task
final MaintenanceTask task = this.maintenanceTask;
if ( task != null ) {
- task.run(this.topologyCapabilities, this.queueConfigManager, this.schedulerRuns - 1);
+ task.run(this.topologyCapabilities, this.schedulerRuns - 1);
}
logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns);
}
@@ -270,7 +268,11 @@ public class JobManagerImpl
}
// get the queue configuration
- final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
+ final TopologyCapabilities caps = this.topologyCapabilities;
+ final QueueInfo queueInfo = caps != null ? caps.getQueueInfo(job.getTopic()) : null;
+ if ( queueInfo == null ) {
+ return; // TODO
+ }
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
// Sanity check if queue configuration has changed
@@ -288,7 +290,6 @@ public class JobManagerImpl
} else {
if ( reassign ) {
- final TopologyCapabilities caps = this.topologyCapabilities;
reassignTargetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
} else {
@@ -1167,7 +1168,7 @@ public class JobManagerImpl
final String jobName,
final Map<String, Object> jobProperties,
final List<String> errors) {
- final QueueInfo info = this.queueConfigManager.getQueueInfo(jobTopic);
+ final QueueInfo info = this.queueManager.getQueueInfo(jobTopic);
if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP ) {
if ( logger.isDebugEnabled() ) {
logger.debug("Dropping job due to configuration of queue {} : {}", info.queueName, Utility.toString(jobTopic, jobName, jobProperties));
@@ -1280,7 +1281,7 @@ public class JobManagerImpl
}
public void reassign(final JobImpl job) {
- final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
+ final QueueInfo queueInfo = queueManager.getQueueInfo(job.getTopic());
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
// Sanity check if queue configuration has changed
@@ -1351,7 +1352,7 @@ public class JobManagerImpl
final JobImpl job = (JobImpl)this.getJobById(jobId);
if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
// get the queue configuration
- final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
+ final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic());
final AbstractJobQueue queue;
synchronized ( queuesLock ) {
queue = this.queues.get(queueInfo.queueName);
@@ -1361,6 +1362,7 @@ public class JobManagerImpl
stopped = queue.stopJob(job);
}
if ( forward && !stopped ) {
+ // TODO why not remove the resource?
// send remote event
final Map<String, Object> props = new HashMap<String, Object>();
props.put(Utility.PROPERTY_ID, jobId);
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java Wed Oct 15 11:30:09 2014
@@ -18,8 +18,6 @@
*/
package org.apache.sling.event.impl.jobs;
-import java.io.IOException;
-import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
@@ -32,15 +30,10 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceUtil;
import org.apache.sling.api.resource.ValueMap;
-import org.apache.sling.discovery.InstanceDescription;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
-import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.support.BatchResourceRemover;
-import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.impl.topology.TopologyCapabilities;
import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.QueueConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,14 +50,6 @@ public class MaintenanceTask {
/** Job manager configuration. */
private final JobManagerConfiguration configuration;
- /** Change count for queue configurations .*/
- private volatile long queueConfigChangeCount = -1;
-
- /** Change count for topology changes .*/
- private volatile long topologyChangeCount = -1;
-
- private boolean checkedForPreviousVersion = false;
-
/**
* Constructor
*/
@@ -72,241 +57,11 @@ public class MaintenanceTask {
this.configuration = config;
}
- private void reassignJobs(final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager) {
- if ( caps != null && caps.isLeader() ) {
- this.logger.debug("Checking for stopped instances...");
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath());
- this.logger.debug("Got jobs root {}", jobsRoot);
-
- // this resource should exist, but we check anyway
- if ( jobsRoot != null ) {
- final Iterator<Resource> instanceIter = jobsRoot.listChildren();
- while ( caps.isActive() && instanceIter.hasNext() ) {
- final Resource instanceResource = instanceIter.next();
-
- final String instanceId = instanceResource.getName();
- if ( !caps.isActive(instanceId) ) {
- logger.debug("Found stopped instance {}", instanceId);
- assignJobs(caps, queueManager, instanceResource, true);
- }
- }
- }
- } finally {
- resolver.close();
- }
- }
- }
-
- /**
- * Try to assign unassigned jobs as there might be changes in:
- * - queue configurations
- * - topology
- * - capabilities
- */
- private void assignUnassignedJobs(final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager) {
- if ( caps != null && caps.isLeader() ) {
- logger.debug("Checking unassigned jobs...");
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath());
- logger.debug("Got unassigned root {}", unassignedRoot);
-
- // this resource should exist, but we check anyway
- if ( unassignedRoot != null ) {
- assignJobs(caps, queueManager, unassignedRoot, false);
- }
- } finally {
- resolver.close();
- }
- }
- }
-
- /**
- * Try to assign all jobs from the jobs root.
- * The jobs are stored by topic
- */
- private void assignJobs(final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager,
- final Resource jobsRoot,
- final boolean unassign) {
- final ResourceResolver resolver = jobsRoot.getResourceResolver();
-
- final Iterator<Resource> topicIter = jobsRoot.listChildren();
- while ( caps.isActive() && topicIter.hasNext() ) {
- final Resource topicResource = topicIter.next();
-
- final String topicName = topicResource.getName().replace('.', '/');
- logger.debug("Found topic {}", topicName);
-
- final String checkTopic;
- if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) {
- checkTopic = "/";
- } else {
- checkTopic = topicName;
- }
-
- // first check if there is an instance for these topics
- final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null);
- if ( potentialTargets != null && potentialTargets.size() > 0 ) {
- final QueueInfo info = queueManager.getQueueInfo(topicName);
- logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
-
- // if queue is configured to drop, we drop
- if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) {
- final Iterator<Resource> i = topicResource.listChildren();
- while ( caps.isActive() && i.hasNext() ) {
- final Resource rsrc = i.next();
- try {
- resolver.delete(rsrc);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- resolver.refresh();
- }
- }
- } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
- // if the queue is not configured to ignore, we can reschedule
- for(final Resource yearResource : topicResource.getChildren() ) {
- for(final Resource monthResource : yearResource.getChildren() ) {
- for(final Resource dayResource : monthResource.getChildren() ) {
- for(final Resource hourResource : dayResource.getChildren() ) {
- for(final Resource minuteResource : hourResource.getChildren() ) {
- for(final Resource rsrc : minuteResource.getChildren() ) {
-
- if ( !caps.isActive() ) {
- return;
- }
-
- try {
- final ValueMap vm = ResourceHelper.getValueMap(rsrc);
- final String targetId = caps.detectTarget(topicName, vm, info);
-
- if ( targetId != null ) {
- final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
- props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(rsrc);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- resolver.refresh();
- }
- }
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.ignoreException(ie);
- resolver.refresh();
- }
- }
- }
- }
- }
- }
- }
- }
- }
- if ( caps.isActive() && unassign ) {
- // we have to move everything to the unassigned area
- for(final Resource yearResource : topicResource.getChildren() ) {
- for(final Resource monthResource : yearResource.getChildren() ) {
- for(final Resource dayResource : monthResource.getChildren() ) {
- for(final Resource hourResource : dayResource.getChildren() ) {
- for(final Resource minuteResource : hourResource.getChildren() ) {
- for(final Resource rsrc : minuteResource.getChildren() ) {
-
- if ( !caps.isActive() ) {
- return;
- }
-
- try {
- final ValueMap vm = ResourceHelper.getValueMap(rsrc);
- final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
- final Map<String, Object> props = new HashMap<String, Object>(vm);
- props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
- props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
- props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
- try {
- ResourceHelper.getOrCreateResource(resolver, newPath, props);
- resolver.delete(rsrc);
- resolver.commit();
- } catch ( final PersistenceException pe ) {
- this.ignoreException(pe);
- resolver.refresh();
- }
- } catch (final InstantiationException ie) {
- // something happened with the resource in the meantime
- this.ignoreException(ie);
- resolver.refresh();
- }
- }
- }
- }
- }
- }
- }
- }
- }
- }
-
- /**
- * Check if the topology has changed.
- */
- private boolean topologyHasChanged(final TopologyCapabilities topologyCapabilities) {
- boolean topologyChanged = false;
- if ( topologyCapabilities != null ) {
- if ( this.topologyChangeCount != topologyCapabilities.getChangeCount() ) {
- this.topologyChangeCount = topologyCapabilities.getChangeCount();
- topologyChanged = true;
- }
- }
- return topologyChanged;
- }
-
- private boolean queueConfigurationHasChanged(final TopologyCapabilities topologyCapabilities,
- final QueueConfigurationManager queueManager) {
- boolean configChanged = false;
- if ( topologyCapabilities != null ) {
- final int queueChangeCount = queueManager.getChangeCount();
- if ( this.queueConfigChangeCount < queueChangeCount ) {
- configChanged = true;
- this.queueConfigChangeCount = queueChangeCount;
- }
- }
- return configChanged;
- }
-
/**
* One maintenance run
*/
public void run(final TopologyCapabilities topologyCapabilities,
- final QueueConfigurationManager queueManager,
final long cleanUpCounter) {
- // check topology and config change during each invocation
- final boolean topologyChanged = this.topologyHasChanged(topologyCapabilities);
- final boolean configChanged = this.queueConfigurationHasChanged(topologyCapabilities, queueManager);
-
- // if topology changed, reschedule assigned jobs for stopped instances
- if ( topologyChanged ) {
- this.reassignJobs(topologyCapabilities, queueManager);
- }
- // try to assign unassigned jobs
- if ( topologyChanged || configChanged ) {
- this.assignUnassignedJobs(topologyCapabilities, queueManager);
- }
-
- if ( topologyChanged && !this.checkedForPreviousVersion && topologyCapabilities != null && topologyCapabilities.isLeader() ) {
- this.processJobsFromPreviousVersions(topologyCapabilities, queueManager);
- }
-
if ( topologyCapabilities != null ) {
// Clean up
final String cleanUpAssignedPath;;
@@ -641,138 +396,4 @@ public class MaintenanceTask {
this.logger.debug("Ignored exception " + e.getMessage(), e);
}
}
-
- /**
- * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
- */
- private void processJobsFromPreviousVersions(final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager) {
- final ResourceResolver resolver = this.configuration.createResourceResolver();
- try {
- this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionAnonPath()));
- this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionIdentifiedPath()));
- this.checkedForPreviousVersion = true;
- } catch ( final PersistenceException pe ) {
- this.logger.warn("Problems moving jobs from previous version.", pe);
- } finally {
- resolver.close();
- }
- }
-
- /**
- * Recursively find jobs and move them
- */
- private void processJobsFromPreviousVersions(final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager,
- final Resource rsrc) throws PersistenceException {
- if ( rsrc != null && caps.isActive() ) {
- if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
- this.moveJobFromPreviousVersion(caps, queueManager, rsrc);
- } else {
- for(final Resource child : rsrc.getChildren()) {
- this.processJobsFromPreviousVersions(caps, queueManager, child);
- }
- if ( caps.isActive() ) {
- rsrc.getResourceResolver().delete(rsrc);
- rsrc.getResourceResolver().commit();
- rsrc.getResourceResolver().refresh();
- }
- }
- }
- }
-
- /**
- * Move a single job
- */
- private void moveJobFromPreviousVersion(final TopologyCapabilities caps,
- final QueueConfigurationManager queueManager,
- final Resource jobResource)
- throws PersistenceException {
- final ResourceResolver resolver = jobResource.getResourceResolver();
-
- try {
- final ValueMap vm = ResourceHelper.getValueMap(jobResource);
- // check for binary properties
- Map<String, Object> binaryProperties = new HashMap<String, Object>();
- final ObjectInputStream ois = vm.get("slingevent:properties", ObjectInputStream.class);
- if ( ois != null ) {
- try {
- int length = ois.readInt();
- for(int i=0;i<length;i++) {
- final String key = (String)ois.readObject();
- final Object value = ois.readObject();
- binaryProperties.put(key, value);
- }
- } catch (final ClassNotFoundException cnfe) {
- throw new PersistenceException("Class not found.", cnfe);
- } catch (final java.io.InvalidClassException ice) {
- throw new PersistenceException("Invalid class.", ice);
- } catch (final IOException ioe) {
- throw new PersistenceException("Unable to deserialize job properties.", ioe);
- } finally {
- try {
- ois.close();
- } catch (final IOException ioe) {
- this.ignoreException(ioe);
- }
- }
- }
-
- final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
-
- properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true);
- final String topic = (String)properties.remove("slingevent:topic");
- properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, topic);
-
- properties.remove(Job.PROPERTY_JOB_QUEUE_NAME);
- properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
- // and binary properties
- properties.putAll(binaryProperties);
- properties.remove("slingevent:properties");
-
- if ( !properties.containsKey(Job.PROPERTY_JOB_RETRIES) ) {
- properties.put(Job.PROPERTY_JOB_RETRIES, 10); // we put a dummy value here; this gets updated by the queue
- }
- if ( !properties.containsKey(Job.PROPERTY_JOB_RETRY_COUNT) ) {
- properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
- }
-
- final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
- String targetId = null;
- if ( potentialTargets != null && potentialTargets.size() > 0 ) {
- final QueueInfo info = queueManager.getQueueInfo(topic);
- logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
- // if queue is configured to drop, we drop
- if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) {
- resolver.delete(jobResource);
- resolver.commit();
- return;
- }
- if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
- targetId = caps.detectTarget(topic, vm, info);
- if ( targetId != null ) {
- properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
- properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
- properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
- }
- }
- }
-
- properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, "old:" + Environment.APPLICATION_ID);
- properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
-
- final String jobId = this.configuration.getUniqueId(topic);
- properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
- properties.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
- final String newPath = this.configuration.getUniquePath(targetId, topic, jobId, vm);
- this.logger.debug("Moving 'old' job from {} to {}", jobResource.getPath(), newPath);
-
- ResourceHelper.getOrCreateResource(resolver, newPath, properties);
- resolver.delete(jobResource);
- resolver.commit();
- } catch (final InstantiationException ie) {
- throw new PersistenceException("Exception while reading reasource: " + ie.getMessage(), ie.getCause());
- }
- }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Wed Oct 15 11:30:09 2014
@@ -31,7 +31,9 @@ import org.apache.sling.api.resource.Log
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker;
+import org.osgi.util.tracker.ServiceTrackerCustomizer;
/**
@@ -41,6 +43,10 @@ import org.osgi.util.tracker.ServiceTrac
@Service(value=QueueConfigurationManager.class)
public class QueueConfigurationManager {
+ public interface QueueConfigurationChangeListener {
+ void configChanged();
+ }
+
/** Configurations - ordered by service ranking. */
private volatile InternalQueueConfiguration[] orderedConfigs = new InternalQueueConfiguration[0];
@@ -53,6 +59,9 @@ public class QueueConfigurationManager {
@Reference
private MainQueueConfiguration mainQueueConfiguration;
+ /** Listeners. */
+ private final List<QueueConfigurationChangeListener> listeners = new ArrayList<QueueConfigurationChangeListener>();
+
/**
* Activate this component.
* Create the service tracker and start it.
@@ -61,7 +70,24 @@ public class QueueConfigurationManager {
protected void activate(final BundleContext bundleContext)
throws LoginException, PersistenceException {
this.configTracker = new ServiceTracker(bundleContext,
- InternalQueueConfiguration.class.getName(), null);
+ InternalQueueConfiguration.class.getName(), new ServiceTrackerCustomizer() {
+
+ @Override
+ public void removedService(final ServiceReference reference, final Object service) {
+ bundleContext.ungetService(reference);
+ updateListeners();
+ }
+
+ @Override
+ public void modifiedService(ServiceReference reference, Object service) {
+ // nothing to do
+ }
+
+ @Override
+ public Object addingService(final ServiceReference reference) {
+ return bundleContext.getService(reference);
+ }
+ });
this.configTracker.open();
}
@@ -145,4 +171,24 @@ public class QueueConfigurationManager {
public int getChangeCount() {
return this.configTracker.getTrackingCount();
}
+
+ public void addListener(final QueueConfigurationChangeListener listener) {
+ synchronized ( this.listeners ) {
+ this.listeners.add(listener);
+ }
+ }
+
+ public void removeListener(final QueueConfigurationChangeListener listener) {
+ synchronized ( this.listeners ) {
+ this.listeners.remove(listener);
+ }
+ }
+
+ private void updateListeners() {
+ synchronized ( listeners ) {
+ for(final QueueConfigurationChangeListener l : listeners) {
+ l.configChanged();
+ }
+ }
+ }
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java?rev=1631994&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java Wed Oct 15 11:30:09 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.topics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.Utility;
+import org.slf4j.Logger;
+
+public class JobTopicTraverser {
+
+ public interface Handler {
+ boolean handle(final JobImpl job);
+ }
+
+ public static void traverse(final Logger logger,
+ final Resource topicResource,
+ final Handler handler) {
+ logger.debug("Processing topic {}", topicResource.getName());
+ // now years
+ for(final Resource yearResource: Utility.getSortedChildren(logger, "year", topicResource)) {
+ final int year = Integer.valueOf(yearResource.getName());
+ logger.debug("Processing year {}", year);
+
+ // now months
+ for(final Resource monthResource: Utility.getSortedChildren(logger, "month", yearResource)) {
+ final int month = Integer.valueOf(monthResource.getName());
+ logger.debug("Processing month {}", month);
+
+ // now days
+ for(final Resource dayResource: Utility.getSortedChildren(logger, "day", monthResource)) {
+ final int day = Integer.valueOf(dayResource.getName());
+ logger.debug("Processing day {}", day);
+
+ // now hours
+ for(final Resource hourResource: Utility.getSortedChildren(logger, "hour", dayResource)) {
+ final int hour = Integer.valueOf(hourResource.getName());
+ logger.debug("Processing hour {}", hour);
+
+ // now minutes
+ for(final Resource minuteResource: Utility.getSortedChildren(logger, "minute", hourResource)) {
+ final int minute = Integer.valueOf(minuteResource.getName());
+ logger.debug("Processing minute {}", minute);
+
+ // now jobs
+ final List<JobImpl> jobs = new ArrayList<JobImpl>();
+ final Iterator<Resource> jobIter = minuteResource.listChildren();
+ while ( jobIter.hasNext() ) {
+ final Resource jobResource = jobIter.next();
+
+ final JobImpl job = Utility.readJob(logger, jobResource);
+ if ( job != null ) {
+ logger.debug("Found job {}", jobResource.getName());
+ jobs.add(job);
+ }
+ }
+
+ Collections.sort(jobs);
+
+ for(final JobImpl job : jobs) {
+ if ( !handler.handle(job) ) {
+ return;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java?rev=1631994&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java Wed Oct 15 11:30:09 2014
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.topology;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintenance task...
+ *
+ * In the default configuration, this task runs every minute
+ */
+public class MaintenanceTask {
+
+ /** Logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** Job manager configuration. */
+ private final JobManagerConfiguration configuration;
+
+ /**
+ * Constructor
+ */
+ public MaintenanceTask(final JobManagerConfiguration config) {
+ this.configuration = config;
+ }
+
+ private void reassignJobs(final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager) {
+ if ( caps != null && caps.isLeader() ) {
+ this.logger.debug("Checking for stopped instances...");
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath());
+ this.logger.debug("Got jobs root {}", jobsRoot);
+
+ // this resource should exist, but we check anyway
+ if ( jobsRoot != null ) {
+ final Iterator<Resource> instanceIter = jobsRoot.listChildren();
+ while ( caps.isActive() && instanceIter.hasNext() ) {
+ final Resource instanceResource = instanceIter.next();
+
+ final String instanceId = instanceResource.getName();
+ if ( !caps.isActive(instanceId) ) {
+ logger.debug("Found stopped instance {}", instanceId);
+ assignJobs(caps, queueManager, instanceResource, true);
+ }
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
+ * Try to assign unassigned jobs as there might be changes in:
+ * - queue configurations
+ * - topology
+ * - capabilities
+ */
+ private void assignUnassignedJobs(final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager) {
+ if ( caps != null && caps.isLeader() ) {
+ logger.debug("Checking unassigned jobs...");
+ final ResourceResolver resolver = this.configuration.createResourceResolver();
+ try {
+ final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath());
+ logger.debug("Got unassigned root {}", unassignedRoot);
+
+ // this resource should exist, but we check anyway
+ if ( unassignedRoot != null ) {
+ assignJobs(caps, queueManager, unassignedRoot, false);
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
+ * Try to assign all jobs from the jobs root.
+ * The jobs are stored by topic
+ */
+ private void assignJobs(final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager,
+ final Resource jobsRoot,
+ final boolean unassign) {
+ final ResourceResolver resolver = jobsRoot.getResourceResolver();
+
+ final Iterator<Resource> topicIter = jobsRoot.listChildren();
+ while ( caps.isActive() && topicIter.hasNext() ) {
+ final Resource topicResource = topicIter.next();
+
+ final String topicName = topicResource.getName().replace('.', '/');
+ logger.debug("Found topic {}", topicName);
+
+ final String checkTopic;
+ if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) {
+ checkTopic = "/";
+ } else {
+ checkTopic = topicName;
+ }
+
+ // first check if there is an instance for these topics
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null);
+ if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+ final QueueInfo info = queueManager.getQueueInfo(topicName);
+ logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
+
+ // if queue is configured to drop, we drop
+ if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) {
+ final Iterator<Resource> i = topicResource.listChildren();
+ while ( caps.isActive() && i.hasNext() ) {
+ final Resource rsrc = i.next();
+ try {
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ resolver.refresh();
+ }
+ }
+ } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
+ // if the queue is not configured to ignore, we can reschedule
+ for(final Resource yearResource : topicResource.getChildren() ) {
+ for(final Resource monthResource : yearResource.getChildren() ) {
+ for(final Resource dayResource : monthResource.getChildren() ) {
+ for(final Resource hourResource : dayResource.getChildren() ) {
+ for(final Resource minuteResource : hourResource.getChildren() ) {
+ for(final Resource rsrc : minuteResource.getChildren() ) {
+
+ if ( !caps.isActive() ) {
+ return;
+ }
+
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+ final String targetId = caps.detectTarget(topicName, vm, info);
+
+ if ( targetId != null ) {
+ final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ resolver.refresh();
+ }
+ }
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ this.ignoreException(ie);
+ resolver.refresh();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ if ( caps.isActive() && unassign ) {
+ // we have to move everything to the unassigned area
+ for(final Resource yearResource : topicResource.getChildren() ) {
+ for(final Resource monthResource : yearResource.getChildren() ) {
+ for(final Resource dayResource : monthResource.getChildren() ) {
+ for(final Resource hourResource : dayResource.getChildren() ) {
+ for(final Resource minuteResource : hourResource.getChildren() ) {
+ for(final Resource rsrc : minuteResource.getChildren() ) {
+
+ if ( !caps.isActive() ) {
+ return;
+ }
+
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(rsrc);
+ final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ resolver.refresh();
+ }
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ this.ignoreException(ie);
+ resolver.refresh();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * One maintenance run
+ */
+ public void run(final TopologyCapabilities topologyCapabilities,
+ final QueueConfigurationManager queueManager,
+ final boolean topologyChanged,
+ final boolean configChanged) {
+ // if topology changed, reschedule assigned jobs for stopped instances
+ if ( topologyChanged ) {
+ this.reassignJobs(topologyCapabilities, queueManager);
+ }
+ // try to assign unassigned jobs
+ if ( topologyChanged || configChanged ) {
+ this.assignUnassignedJobs(topologyCapabilities, queueManager);
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/MaintenanceTask.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java?rev=1631994&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java Wed Oct 15 11:30:09 2014
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.topology;
+
+import java.util.Iterator;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.topics.JobTopicTraverser;
+import org.apache.sling.event.jobs.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RestartTask {
+
+ /** Logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ public void run(final JobManagerConfiguration configuration) {
+ this.initialScan(configuration);
+ }
+
+ /**
+ * Scan the resource tree for unfinished jobs from previous runs
+ */
+ private void initialScan(final JobManagerConfiguration configuration) {
+ logger.debug("Scanning repository for unfinished jobs...");
+ final ResourceResolver resolver = configuration.createResourceResolver();
+ try {
+ final Resource baseResource = resolver.getResource(configuration.getLocalJobsPath());
+
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final Iterator<Resource> topicIter = baseResource.listChildren();
+ while ( topicIter.hasNext() ) {
+ final Resource topicResource = topicIter.next();
+ logger.debug("Found topic {}", topicResource.getName());
+
+ // init topic
+ initTopic(topicResource);
+ }
+ }
+ } finally {
+ resolver.close();
+ }
+ }
+
+ /**
+ * Initialize a topic and update all jobs from that topic.
+ * Reset started time and increase retry count of unfinished jobs
+ * @param topicResource The topic resource
+ */
+ private void initTopic(final Resource topicResource) {
+ logger.debug("Initializing topic {}...", topicResource.getName());
+
+ JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.Handler() {
+
+ @Override
+ public boolean handle(final JobImpl job) {
+ if ( job.getProcessingStarted() != null ) {
+ job.retry();
+ try {
+ final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+ // sanity check
+ if ( jobResource != null ) {
+ final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getRetryCount());
+ jobResource.getResourceResolver().commit();
+ }
+ } catch ( final PersistenceException ignore) {
+ logger.error("Unable to update unfinished job " + job, ignore);
+ }
+ }
+ return true;
+ }
+ });
+ logger.debug("Topic {} initialized", topicResource.getName());
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/RestartTask.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyCapabilities.java Wed Oct 15 11:30:09 2014
@@ -30,6 +30,8 @@ import org.apache.sling.discovery.Instan
import org.apache.sling.discovery.TopologyView;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.QueueConfiguration;
@@ -73,6 +75,9 @@ public class TopologyCapabilities {
/** JobManagerConfiguration. */
private final JobManagerConfiguration jobManagerConfiguration;
+ /** Queue config manager. */
+ private final QueueConfigurationManager queueManager;
+
public static final class InstanceDescriptionComparator implements Comparator<InstanceDescription> {
private final String localClusterId;
@@ -120,8 +125,11 @@ public class TopologyCapabilities {
return allInstances;
}
- public TopologyCapabilities(final TopologyView view, final JobManagerConfiguration config) {
+ public TopologyCapabilities(final TopologyView view,
+ final QueueConfigurationManager queueManager,
+ final JobManagerConfiguration config) {
this.jobManagerConfiguration = config;
+ this.queueManager = queueManager;
this.instanceComparator = new InstanceDescriptionComparator(view.getLocalInstance().getClusterView().getId());
this.isLeader = view.getLocalInstance().isLeader();
this.allInstances = getAllInstancesMap(view);
@@ -156,11 +164,11 @@ public class TopologyCapabilities {
public boolean isActive() {
return this.active;
}
-
+/*
public long getChangeCount() {
return this.changeCount;
}
-
+*/
public boolean isActive(final String instanceId) {
return this.allInstances.containsKey(instanceId);
}
@@ -280,4 +288,17 @@ public class TopologyCapabilities {
return this.instanceCapabilities;
}
+ public QueueInfo getQueueInfo(final String topic) {
+ if ( this.active ) {
+ return this.queueManager.getQueueInfo(topic);
+ }
+ return null;
+ }
+
+ public InternalQueueConfiguration[] getQueueConfigurations() {
+ if ( this.active ) {
+ return this.queueManager.getConfigurations();
+ }
+ return null;
+ }
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java?rev=1631994&r1=1631993&r2=1631994&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/TopologyHandler.java Wed Oct 15 11:30:09 2014
@@ -22,25 +22,30 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
-import org.apache.sling.discovery.TopologyView;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueConfigurationChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The topology handler listens for topology events
+ * The topology handler listens for topology events.
+ *
+ * TODO - config changes should actually do a real stop/start
*/
@Component(immediate=true)
@Service(value={TopologyHandler.class, TopologyEventListener.class})
public class TopologyHandler
- implements TopologyEventListener {
+ implements TopologyEventListener, QueueConfigurationChangeListener {
/** Logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -51,20 +56,75 @@ public class TopologyHandler
@Reference
private JobManagerConfiguration configuration;
+ @Reference
+ private QueueConfigurationManager queueManager;
+
/** The topology capabilities. */
private volatile TopologyCapabilities topologyCapabilities;
- private void stopProcessing() {
+ @Activate
+ protected void activate() {
+ this.queueManager.addListener(this);
+ }
+
+ @Deactivate
+ protected void dectivate() {
+ this.queueManager.removeListener(this);
+ }
+
+
+ @Override
+ public void configChanged() {
+ final TopologyCapabilities caps = this.topologyCapabilities;
+ if ( caps != null ) {
+ synchronized ( this.listeners ) {
+ // this.stopProcessing(false);
+
+ this.startProcessing(Type.PROPERTIES_CHANGED, caps, true);
+ }
+ }
+ }
+
+ private void stopProcessing(final boolean deactivate) {
+ boolean notify = this.topologyCapabilities != null;
// deactivate old capabilities - this stops all background processes
- if ( this.topologyCapabilities != null ) {
+ if ( deactivate && this.topologyCapabilities != null ) {
this.topologyCapabilities.deactivate();
}
this.topologyCapabilities = null;
+
+ if ( notify ) {
+ // stop all listeners
+ this.notifiyListeners();
+ }
}
- private void startProcessing(final TopologyView view) {
+ private void startProcessing(final Type eventType, final TopologyCapabilities newCaps, final boolean isConfigChange) {
// create new capabilities and update view
- this.topologyCapabilities = new TopologyCapabilities(view, this.configuration);
+ this.topologyCapabilities = newCaps;
+
+ // before we propagate the new topology we do some maintenance
+ if ( eventType == Type.TOPOLOGY_INIT ) {
+ final UpgradeTask task = new UpgradeTask();
+ task.run(this.configuration, this.topologyCapabilities, queueManager);
+
+ final RestartTask rt = new RestartTask();
+ rt.run(this.configuration);
+ }
+
+ final MaintenanceTask mt = new MaintenanceTask(this.configuration);
+ mt.run(topologyCapabilities, queueManager, !isConfigChange, isConfigChange);
+
+ if ( !isConfigChange ) {
+ // start listeners
+ this.notifiyListeners();
+ }
+ }
+
+ private void notifiyListeners() {
+ for(final TopologyAware l : this.listeners) {
+ l.topologyChanged(this.topologyCapabilities);
+ }
}
/**
@@ -86,22 +146,15 @@ public class TopologyHandler
synchronized ( this.listeners ) {
if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
- this.stopProcessing();
+ this.stopProcessing(true);
- for(final TopologyAware l : this.listeners) {
- l.topologyChanged(this.topologyCapabilities);
- }
} else if ( event.getType() == Type.TOPOLOGY_INIT
|| event.getType() == Type.TOPOLOGY_CHANGED
|| event.getType() == Type.PROPERTIES_CHANGED ) {
- this.stopProcessing();
-
- this.startProcessing(event.getNewView());
+ this.stopProcessing(true);
- for(final TopologyAware l : this.listeners) {
- l.topologyChanged(this.topologyCapabilities);
- }
+ this.startProcessing(event.getType(), new TopologyCapabilities(event.getNewView(), this.queueManager, this.configuration), false);
}
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java?rev=1631994&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java Wed Oct 15 11:30:09 2014
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.topology;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.event.impl.jobs.JobImpl;
+import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Upgrade task
+ *
+ * Upgrade jobs from earlier versions to the new format.
+ */
+public class UpgradeTask {
+
+ /** Logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /**
+ * Upgrade
+ */
+ public void run(final JobManagerConfiguration configuration,
+ final TopologyCapabilities topologyCapabilities,
+ final QueueConfigurationManager queueManager) {
+ if ( topologyCapabilities.isLeader() ) {
+ this.processJobsFromPreviousVersions(configuration, topologyCapabilities, queueManager);
+ }
+ }
+
+ /**
+ * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
+ */
+ private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
+ final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager) {
+ final ResourceResolver resolver = configuration.createResourceResolver();
+ try {
+ this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionAnonPath()));
+ this.processJobsFromPreviousVersions(configuration, caps, queueManager, resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
+ } catch ( final PersistenceException pe ) {
+ this.logger.warn("Problems moving jobs from previous version.", pe);
+ } finally {
+ resolver.close();
+ }
+ }
+
+ /**
+ * Recursively find jobs and move them
+ */
+ private void processJobsFromPreviousVersions(final JobManagerConfiguration configuration,
+ final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager,
+ final Resource rsrc) throws PersistenceException {
+ if ( rsrc != null && caps.isActive() ) {
+ if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
+ this.moveJobFromPreviousVersion(configuration, caps, queueManager, rsrc);
+ } else {
+ for(final Resource child : rsrc.getChildren()) {
+ this.processJobsFromPreviousVersions(configuration, caps, queueManager, child);
+ }
+ if ( caps.isActive() ) {
+ rsrc.getResourceResolver().delete(rsrc);
+ rsrc.getResourceResolver().commit();
+ rsrc.getResourceResolver().refresh();
+ }
+ }
+ }
+ }
+
+ /**
+ * Move a single job
+ */
+ private void moveJobFromPreviousVersion(final JobManagerConfiguration configuration,
+ final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager,
+ final Resource jobResource)
+ throws PersistenceException {
+ final ResourceResolver resolver = jobResource.getResourceResolver();
+
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+ // check for binary properties
+ Map<String, Object> binaryProperties = new HashMap<String, Object>();
+ final ObjectInputStream ois = vm.get("slingevent:properties", ObjectInputStream.class);
+ if ( ois != null ) {
+ try {
+ int length = ois.readInt();
+ for(int i=0;i<length;i++) {
+ final String key = (String)ois.readObject();
+ final Object value = ois.readObject();
+ binaryProperties.put(key, value);
+ }
+ } catch (final ClassNotFoundException cnfe) {
+ throw new PersistenceException("Class not found.", cnfe);
+ } catch (final java.io.InvalidClassException ice) {
+ throw new PersistenceException("Invalid class.", ice);
+ } catch (final IOException ioe) {
+ throw new PersistenceException("Unable to deserialize job properties.", ioe);
+ } finally {
+ try {
+ ois.close();
+ } catch (final IOException ioe) {
+ throw new PersistenceException("Unable to deserialize job properties.", ioe);
+ }
+ }
+ }
+
+ final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+
+ properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true);
+ final String topic = (String)properties.remove("slingevent:topic");
+ properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, topic);
+
+ properties.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ // and binary properties
+ properties.putAll(binaryProperties);
+ properties.remove("slingevent:properties");
+
+ if ( !properties.containsKey(Job.PROPERTY_JOB_RETRIES) ) {
+ properties.put(Job.PROPERTY_JOB_RETRIES, 10); // we put a dummy value here; this gets updated by the queue
+ }
+ if ( !properties.containsKey(Job.PROPERTY_JOB_RETRY_COUNT) ) {
+ properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
+ }
+
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
+ String targetId = null;
+ if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+ final QueueInfo info = queueManager.getQueueInfo(topic);
+ logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
+ // if queue is configured to drop, we drop
+ if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) {
+ resolver.delete(jobResource);
+ resolver.commit();
+ return;
+ }
+ if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
+ targetId = caps.detectTarget(topic, vm, info);
+ if ( targetId != null ) {
+ properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+ properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
+ }
+ }
+ }
+
+ properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, "old:" + Environment.APPLICATION_ID);
+ properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
+
+ final String jobId = configuration.getUniqueId(topic);
+ properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
+ properties.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ final String newPath = configuration.getUniquePath(targetId, topic, jobId, vm);
+ this.logger.debug("Moving 'old' job from {} to {}", jobResource.getPath(), newPath);
+
+ ResourceHelper.getOrCreateResource(resolver, newPath, properties);
+ resolver.delete(jobResource);
+ resolver.commit();
+ } catch (final InstantiationException ie) {
+ throw new PersistenceException("Exception while reading reasource: " + ie.getMessage(), ie.getCause());
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/topology/UpgradeTask.java
------------------------------------------------------------------------------
svn:mime-type = text/plain