You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/22 13:42:55 UTC
svn commit: r1470462 [3/7] - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/event/impl/
src/main/java/org/apache/sling/event/impl/dea/
src/main/java/org/apache/sling/event/impl/jobs/ sr...
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,707 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.QuerySyntaxException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.event.jobs.JobUtil.JobPriority;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maintenance task...
+ *
+ * In the default configuration, this task runs every minute
+ */
+public class MaintenanceTask {
+
+ /** Logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** Resource resolver factory. */
+ private final ResourceResolverFactory resourceResolverFactory;
+
+ /** Job manager configuration. */
+ private final JobManagerConfiguration configuration;
+
+ /** Change count for queue configurations .*/
+ private volatile long queueConfigChangeCount = -1;
+
+ /** Change count for topology changes .*/
+ private volatile long topologyChangeCount = -1;
+
+ private boolean checkedForPreviousVersion = false;
+
+ /**
+ * Constructor
+ */
+ public MaintenanceTask(final JobManagerConfiguration config, final ResourceResolverFactory factory) {
+ this.resourceResolverFactory = factory;
+ this.configuration = config;
+ }
+
+ private void reassignJobs(final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager) {
+ if ( caps != null && caps.isLeader() ) {
+ this.logger.debug("Checking for stopped instances...");
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath());
+ this.logger.debug("Got jobs root {}", jobsRoot);
+
+ // this resource should exist, but we check anyway
+ if ( jobsRoot != null ) {
+ final Iterator<Resource> instanceIter = jobsRoot.listChildren();
+ while ( caps.isActive() && instanceIter.hasNext() ) {
+ final Resource instanceResource = instanceIter.next();
+
+ final String instanceId = instanceResource.getName();
+ if ( !caps.isActive(instanceId) ) {
+ logger.debug("Found stopped instance {}", instanceId);
+ assignJobs(caps, queueManager, instanceResource, true);
+ }
+ }
+ }
+ } catch ( final LoginException le ) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Try to assign unassigned jobs as there might be changes in:
+ * - queue configurations
+ * - topology
+ * - capabilities
+ */
+ private void assignUnassignedJobs(final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager) {
+ if ( caps != null && caps.isLeader() ) {
+ logger.debug("Checking unassigned jobs...");
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath());
+ logger.debug("Got unassigned root {}", unassignedRoot);
+
+ // this resource should exist, but we check anyway
+ if ( unassignedRoot != null ) {
+ assignJobs(caps, queueManager, unassignedRoot, false);
+ }
+ } catch ( final LoginException le ) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Try to assign all jobs from the jobs root.
+ * The jobs are stored by topic
+ */
+ private void assignJobs(final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager,
+ final Resource jobsRoot,
+ final boolean unassign) {
+ final ResourceResolver resolver = jobsRoot.getResourceResolver();
+
+ final Iterator<Resource> topicIter = jobsRoot.listChildren();
+ while ( caps.isActive() && topicIter.hasNext() ) {
+ final Resource topicResource = topicIter.next();
+
+ final String topicName = topicResource.getName().replace('.', '/');
+ logger.debug("Found topic {}", topicName);
+
+ final String checkTopic;
+ if ( topicName.equals(JobImpl.PROPERTY_BRIDGED_EVENT) ) {
+ checkTopic = "/";
+ } else {
+ checkTopic = topicName;
+ }
+
+ // first check if there is an instance for these topics
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(checkTopic, null);
+ if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+ final QueueInfo info = queueManager.getQueueInfo(topicName);
+ logger.debug("Found queue {} for {}", info.queueConfiguration, topicName);
+ // if queue is configured to drop, we drop
+ if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) {
+ final Iterator<Resource> i = topicResource.listChildren();
+ while ( caps.isActive() && i.hasNext() ) {
+ final Resource rsrc = i.next();
+ try {
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ resolver.refresh();
+ }
+ }
+ } else if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
+ // if the queue is not configured to ignore, we can reschedule
+ for(final Resource yearResource : topicResource.getChildren() ) {
+ for(final Resource monthResource : yearResource.getChildren() ) {
+ for(final Resource dayResource : monthResource.getChildren() ) {
+ for(final Resource hourResource : dayResource.getChildren() ) {
+ for(final Resource minuteResource : hourResource.getChildren() ) {
+ for(final Resource rsrc : minuteResource.getChildren() ) {
+
+ if ( !caps.isActive() ) {
+ return;
+ }
+
+ final ValueMap vm = ResourceUtil.getValueMap(rsrc);
+ final String targetId = caps.detectTarget(topicName, vm, info);
+
+ if ( targetId != null ) {
+ final String newPath = this.configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ resolver.refresh();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ if ( caps.isActive() && unassign ) {
+ // we have to move everything to the unassigned area
+ for(final Resource yearResource : topicResource.getChildren() ) {
+ for(final Resource monthResource : yearResource.getChildren() ) {
+ for(final Resource dayResource : monthResource.getChildren() ) {
+ for(final Resource hourResource : dayResource.getChildren() ) {
+ for(final Resource minuteResource : hourResource.getChildren() ) {
+ for(final Resource rsrc : minuteResource.getChildren() ) {
+
+ if ( !caps.isActive() ) {
+ return;
+ }
+
+ final ValueMap vm = ResourceUtil.getValueMap(rsrc);
+ final String newPath = this.configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(rsrc);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ resolver.refresh();
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Check if the topology has changed.
+ */
+ private boolean topologyHasChanged(final TopologyCapabilities topologyCapabilities) {
+ boolean topologyChanged = false;
+ if ( topologyCapabilities != null ) {
+ if ( this.topologyChangeCount < topologyCapabilities.getChangeCount() ) {
+ this.topologyChangeCount = topologyCapabilities.getChangeCount();
+ topologyChanged = true;
+ }
+ }
+ return topologyChanged;
+ }
+
+ private boolean queueConfigurationHasChanged(final TopologyCapabilities topologyCapabilities,
+ final QueueConfigurationManager queueManager) {
+ boolean configChanged = false;
+ if ( topologyCapabilities != null ) {
+ final int queueChangeCount = queueManager.getChangeCount();
+ if ( this.queueConfigChangeCount < queueChangeCount ) {
+ configChanged = true;
+ this.queueConfigChangeCount = queueChangeCount;
+ }
+ }
+ return configChanged;
+ }
+
+ /**
+ * One maintenance run
+ */
+ public void run(final TopologyCapabilities topologyCapabilities,
+ final QueueConfigurationManager queueManager,
+ final long cleanUpCounter) {
+ // check topology and config change during each invocation
+ final boolean topologyChanged = this.topologyHasChanged(topologyCapabilities);
+ final boolean configChanged = this.queueConfigurationHasChanged(topologyCapabilities, queueManager);
+
+ // if topology changed, reschedule assigned jobs for stopped instances
+ if ( topologyChanged ) {
+ this.reassignJobs(topologyCapabilities, queueManager);
+ }
+ // try to assign unassigned jobs
+ if ( topologyChanged || configChanged ) {
+ this.assignUnassignedJobs(topologyCapabilities, queueManager);
+ }
+
+ if ( topologyChanged && !this.checkedForPreviousVersion && topologyCapabilities != null && topologyCapabilities.isLeader() ) {
+ this.processJobsFromPreviousVersions(topologyCapabilities, queueManager);
+ }
+
+ // Clean up
+ final String cleanUpAssignedPath;;
+ if ( topologyCapabilities != null && topologyCapabilities.isLeader() ) {
+ cleanUpAssignedPath = this.configuration.getUnassignedJobsPath();
+ } else {
+ cleanUpAssignedPath = null;
+ }
+
+ if ( cleanUpCounter % 60 == 0 ) { // full clean up is done every hour
+ this.fullEmptyFolderCleanup(topologyCapabilities, this.configuration.getLocalJobsPath());
+ if ( cleanUpAssignedPath != null ) {
+ this.fullEmptyFolderCleanup(topologyCapabilities, cleanUpAssignedPath);
+ }
+ } else if ( cleanUpCounter % 5 == 0 ) { // simple clean up every 5 minutes
+ this.simpleEmptyFolderCleanup(topologyCapabilities, this.configuration.getLocalJobsPath());
+ if ( cleanUpAssignedPath != null ) {
+ this.simpleEmptyFolderCleanup(topologyCapabilities, cleanUpAssignedPath);
+ }
+ }
+
+ // lock cleanup is done every 3 minutes
+ if ( cleanUpCounter % 3 == 0 ) {
+ this.lockCleanup(topologyCapabilities);
+ }
+ }
+
+ /**
+ * Clean up the locks
+ * All locks older than three minutes are removed
+ */
+ private void lockCleanup(final TopologyCapabilities caps) {
+ if ( caps != null && caps.isLeader() ) {
+ this.logger.debug("Cleaning up job resource tree: removing obsolete locks");
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Calendar startDate = Calendar.getInstance();
+ startDate.add(Calendar.MINUTE, -3);
+
+ final StringBuilder buf = new StringBuilder(64);
+
+ buf.append("//element(*)[@");
+ buf.append(ISO9075.encode(Utility.PROPERTY_LOCK_CREATED));
+ buf.append(" < xs:dateTime('");
+ buf.append(ISO8601.format(startDate));
+ buf.append("')]");
+ final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
+
+ while ( caps.isActive() && result.hasNext() ) {
+ final Resource lockResource = result.next();
+ // sanity check for the path
+ if ( this.configuration.isLock(lockResource.getPath()) ) {
+ try {
+ resolver.delete(lockResource);
+ resolver.commit();
+ } catch ( final PersistenceException pe) {
+ this.ignoreException(pe);
+ resolver.refresh();
+ }
+ }
+ }
+ } catch (final QuerySyntaxException qse) {
+ this.ignoreException(qse);
+ } catch (final LoginException le) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Simple empty folder removes empty folders for the last five minutes
+ * from an hour ago!
+ * If folder for minute 59 is removed, we check the hour folder as well.
+ */
+ private void simpleEmptyFolderCleanup(final TopologyCapabilities caps, final String basePath) {
+ this.logger.debug("Cleaning up job resource tree: looking for empty folders");
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Calendar cleanUpDate = Calendar.getInstance();
+ // go back ten minutes
+ cleanUpDate.add(Calendar.HOUR, -1);
+
+ final Resource baseResource = resolver.getResource(basePath);
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final Iterator<Resource> topicIter = baseResource.listChildren();
+ while ( caps.isActive() && topicIter.hasNext() ) {
+ final Resource topicResource = topicIter.next();
+
+ for(int i = 0; i < 5; i++) {
+ if ( caps.isActive() ) {
+ final StringBuilder sb = new StringBuilder(topicResource.getPath());
+ sb.append('/');
+ sb.append(cleanUpDate.get(Calendar.YEAR));
+ sb.append('/');
+ sb.append(cleanUpDate.get(Calendar.MONTH) + 1);
+ sb.append('/');
+ sb.append(cleanUpDate.get(Calendar.DAY_OF_MONTH));
+ sb.append('/');
+ sb.append(cleanUpDate.get(Calendar.HOUR_OF_DAY));
+ final String path = sb.toString();
+
+ final Resource dateResource = resolver.getResource(path);
+ if ( dateResource != null && !dateResource.listChildren().hasNext() ) {
+ resolver.delete(dateResource);
+ resolver.commit();
+ }
+ // check hour folder
+ if ( path.endsWith("59") ) {
+ final String hourPath = path.substring(0, path.length() - 3);
+ final Resource hourResource = resolver.getResource(hourPath);
+ if ( hourResource != null && !hourResource.listChildren().hasNext() ) {
+ resolver.delete(hourResource);
+ resolver.commit();
+ }
+ }
+ cleanUpDate.add(Calendar.MINUTE, -1);
+ }
+ }
+ }
+ }
+
+ } catch (final PersistenceException pe) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during job resource tree cleanup.", pe);
+ } catch (final LoginException ignore) {
+ this.ignoreException(ignore);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
+ * Full cleanup - this scans all directories!
+ */
+ private void fullEmptyFolderCleanup(final TopologyCapabilities caps, final String basePath) {
+ this.logger.debug("Cleaning up job resource tree: removing ALL empty folders");
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ final Resource baseResource = resolver.getResource(basePath);
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final Calendar now = Calendar.getInstance();
+
+ final Iterator<Resource> topicIter = baseResource.listChildren();
+ while ( caps.isActive() && topicIter.hasNext() ) {
+ final Resource topicResource = topicIter.next();
+
+ // now years
+ final Iterator<Resource> yearIter = topicResource.listChildren();
+ while ( caps.isActive() && yearIter.hasNext() ) {
+ final Resource yearResource = yearIter.next();
+ final int year = Integer.valueOf(yearResource.getName());
+ final boolean oldYear = year < now.get(Calendar.YEAR);
+
+ // months
+ final Iterator<Resource> monthIter = yearResource.listChildren();
+ while ( caps.isActive() && monthIter.hasNext() ) {
+ final Resource monthResource = monthIter.next();
+ final int month = Integer.valueOf(monthResource.getName());
+ final boolean oldMonth = oldYear || month < (now.get(Calendar.MONTH) + 1);
+
+ // days
+ final Iterator<Resource> dayIter = monthResource.listChildren();
+ while ( caps.isActive() && dayIter.hasNext() ) {
+ final Resource dayResource = dayIter.next();
+ final int day = Integer.valueOf(dayResource.getName());
+ final boolean oldDay = oldMonth || day < now.get(Calendar.DAY_OF_MONTH);
+
+ // hours
+ final Iterator<Resource> hourIter = dayResource.listChildren();
+ while ( caps.isActive() && hourIter.hasNext() ) {
+ final Resource hourResource = hourIter.next();
+ final int hour = Integer.valueOf(hourResource.getName());
+ final boolean oldHour = (oldDay && (oldMonth || now.get(Calendar.HOUR_OF_DAY) > 0)) || hour < (now.get(Calendar.HOUR_OF_DAY) -1);
+
+ // we only remove minutes if the hour is old
+ if ( oldHour ) {
+ final Iterator<Resource> minuteIter = hourResource.listChildren();
+ while ( caps.isActive() && minuteIter.hasNext() ) {
+ final Resource minuteResource = minuteIter.next();
+
+ // check if we can delete the minute
+ if ( !minuteResource.listChildren().hasNext() ) {
+ resolver.delete(minuteResource);
+ resolver.commit();
+ }
+ }
+ }
+
+ // check if we can delete the hour
+ if ( caps.isActive() && oldHour && !hourResource.listChildren().hasNext()) {
+ resolver.delete(hourResource);
+ resolver.commit();
+ }
+ }
+ // check if we can delete the day
+ if ( caps.isActive() && oldDay && !dayResource.listChildren().hasNext()) {
+ resolver.delete(dayResource);
+ resolver.commit();
+ }
+ }
+
+ // check if we can delete the month
+ if ( caps.isActive() && oldMonth && !monthResource.listChildren().hasNext() ) {
+ resolver.delete(monthResource);
+ resolver.commit();
+ }
+ }
+
+ // check if we can delete the year
+ if ( caps.isActive() && oldYear && !yearResource.listChildren().hasNext() ) {
+ resolver.delete(yearResource);
+ resolver.commit();
+ }
+ }
+ }
+ }
+
+ } catch (final PersistenceException pe) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during job resource tree cleanup.", pe);
+ } catch (final LoginException ignore) {
+ this.ignoreException(ignore);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+
+ public void reassignJob(final JobImpl job, final String targetId) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource jobResource = resolver.getResource(job.getResourcePath());
+ if ( jobResource != null ) {
+ final ValueMap vm = ResourceUtil.getValueMap(jobResource);
+ final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+
+ final Map<String, Object> props = new HashMap<String, Object>(vm);
+ props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ if ( targetId == null ) {
+ props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+ } else {
+ props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ }
+ props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ try {
+ ResourceHelper.getOrCreateResource(resolver, newPath, props);
+ resolver.delete(jobResource);
+ resolver.commit();
+ } catch ( final PersistenceException pe ) {
+ this.ignoreException(pe);
+ resolver.refresh();
+ }
+ }
+ } catch (final LoginException ignore) {
+ this.ignoreException(ignore);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area
+ */
+ private void processJobsFromPreviousVersions(final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionAnonPath()));
+ this.processJobsFromPreviousVersions(caps, queueManager, resolver.getResource(this.configuration.getPreviousVersionIdentifiedPath()));
+ this.checkedForPreviousVersion = true;
+ } catch ( final PersistenceException pe ) {
+ this.logger.warn("Problems moving jobs from previous version.", pe);
+ } catch ( final LoginException le ) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+
+ /**
+ * Recursively find jobs and move them
+ */
+ private void processJobsFromPreviousVersions(final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager,
+ final Resource rsrc) throws PersistenceException {
+ if ( rsrc != null && caps.isActive() ) {
+ if ( rsrc.isResourceType(ResourceHelper.RESOURCE_TYPE_JOB) ) {
+ this.moveJobFromPreviousVersion(caps, queueManager, rsrc);
+ } else {
+ for(final Resource child : rsrc.getChildren()) {
+ this.processJobsFromPreviousVersions(caps, queueManager, child);
+ }
+ if ( caps.isActive() ) {
+ rsrc.getResourceResolver().delete(rsrc);
+ rsrc.getResourceResolver().commit();
+ rsrc.getResourceResolver().refresh();
+ }
+ }
+ }
+ }
+
+ /**
+ * Move a single job
+ */
+ private void moveJobFromPreviousVersion(final TopologyCapabilities caps,
+ final QueueConfigurationManager queueManager,
+ final Resource jobResource)
+ throws PersistenceException {
+ final ResourceResolver resolver = jobResource.getResourceResolver();
+
+ final ValueMap vm = ResourceUtil.getValueMap(jobResource);
+ final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+
+ properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true);
+ final String topic = (String)properties.remove("slingevent:topic");
+ properties.put(JobUtil.PROPERTY_JOB_TOPIC, topic);
+
+ properties.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+ properties.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+
+ if ( !properties.containsKey(Job.PROPERTY_JOB_RETRIES) ) {
+ properties.put(Job.PROPERTY_JOB_RETRIES, 10); // we put a dummy value here; this gets updated by the queue
+ }
+ if ( !properties.containsKey(Job.PROPERTY_JOB_RETRY_COUNT) ) {
+ properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
+ }
+ properties.put(Job.PROPERTY_JOB_PRIORITY, JobPriority.NORM.name());
+
+ final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
+ String targetId = null;
+ if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+ final QueueInfo info = queueManager.getQueueInfo(topic);
+ logger.debug("Found queue {} for {}", info.queueConfiguration, topic);
+ // if queue is configured to drop, we drop
+ if ( info.queueConfiguration.getType() == QueueConfiguration.Type.DROP) {
+ resolver.delete(jobResource);
+ resolver.commit();
+ return;
+ }
+ if ( info.queueConfiguration.getType() != QueueConfiguration.Type.IGNORE ) {
+ targetId = caps.detectTarget(topic, vm, info);
+ if ( targetId != null ) {
+ properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+ properties.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+ properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
+ properties.put(Job.PROPERTY_JOB_PRIORITY, info.queueConfiguration.getPriority().name());
+ }
+ }
+ }
+
+ properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, "old:" + Environment.APPLICATION_ID);
+ properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
+
+ final String jobId = this.configuration.getUniqueId(topic);
+ properties.put(JobUtil.JOB_ID, jobId);
+ properties.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+ final String newPath = this.configuration.getUniquePath(targetId, topic, jobId, vm);
+ this.logger.debug("Moving 'old' job from {} to {}", jobResource.getPath(), newPath);
+
+ ResourceHelper.getOrCreateResource(resolver, newPath, properties);
+ resolver.delete(jobResource);
+ resolver.commit();
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.jobs.JobConsumer;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The capabilities of a topology.
+ */
+public class TopologyCapabilities {
+
+ /** Logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** Map: key: topic, value: sling IDs */
+ private final Map<String, List<InstanceDescription>> instanceCapabilities;
+
+ /** Round robin map. */
+ private final Map<String, Integer> roundRobinMap = new HashMap<String, Integer>();
+
+ /** Is this the leader of the cluster? */
+ private final boolean isLeader;
+
+ /** Is this still active? */
+ private volatile boolean active = true;
+
+ /** Change count. */
+ private final long changeCount;
+
+ /** All instances. */
+ private final Map<String, String> allInstances;
+
+ private static final class InstanceDescriptionComparator implements Comparator<InstanceDescription> {
+
+ @Override
+ public int compare(final InstanceDescription o1, final InstanceDescription o2) {
+ if ( o1.getSlingId().equals(o2.getSlingId()) ) {
+ return 0;
+ }
+ if ( o1.isLeader() ) {
+ return -1;
+ } else if ( o2.isLeader() ) {
+ return 1;
+ }
+ return o1.getSlingId().compareTo(o2.getSlingId());
+ }
+ }
+ private static final InstanceDescriptionComparator COMPARATOR = new InstanceDescriptionComparator();
+
+ public static Map<String, String> getAllInstancesMap(final TopologyView view) {
+ final Map<String, String> allInstances = new TreeMap<String, String>();
+
+ for(final InstanceDescription desc : view.getInstances() ) {
+ final String topics = desc.getProperty(JobConsumer.PROPERTY_TOPICS);
+ if ( topics != null && topics.length() > 0 ) {
+ allInstances.put(desc.getSlingId(), topics);
+ } else {
+ allInstances.put(desc.getSlingId(), "");
+ }
+ }
+ return allInstances;
+ }
+
+ public TopologyCapabilities(final TopologyView view, final long changeCount) {
+ this.changeCount = changeCount;
+ this.isLeader = view.getLocalInstance().isLeader();
+ this.allInstances = getAllInstancesMap(view);
+ final Map<String, List<InstanceDescription>> newCaps = new HashMap<String, List<InstanceDescription>>();
+ for(final InstanceDescription desc : view.getInstances() ) {
+ final String topics = desc.getProperty(JobConsumer.PROPERTY_TOPICS);
+ if ( topics != null && topics.length() > 0 ) {
+ this.logger.debug("Capabilities of {} : {}", desc.getSlingId(), topics);
+ for(final String topic : topics.split(",") ) {
+ List<InstanceDescription> list = newCaps.get(topic);
+ if ( list == null ) {
+ list = new ArrayList<InstanceDescription>();
+ newCaps.put(topic, list);
+ }
+ list.add(desc);
+ Collections.sort(list, COMPARATOR);
+ }
+ }
+ }
+ this.instanceCapabilities = newCaps;
+ }
+
+ public boolean isSame(final Map<String, String> newAllInstancesMap) {
+ return this.allInstances.equals(newAllInstancesMap);
+ }
+
+ public void deactivate() {
+ this.active = false;
+ }
+
+ public boolean isActive() {
+ return this.active;
+ }
+
+ public long getChangeCount() {
+ return this.changeCount;
+ }
+
+ public boolean isActive(final String instanceId) {
+ return this.allInstances.containsKey(instanceId);
+ }
+ /**
+ * Is the current instance the leader?
+ */
+ public boolean isLeader() {
+ return this.isLeader;
+ }
+
+ /**
+ * Return the potential targets (Sling IDs) sorted by ID
+ */
+ public List<InstanceDescription> getPotentialTargets(final String jobTopic, final Map<String, Object> jobProperties) {
+ // calculate potential targets
+ final List<InstanceDescription> potentialTargets = new ArrayList<InstanceDescription>();
+
+ // first: topic targets - directly handling the topic
+ final List<InstanceDescription> topicTargets = this.instanceCapabilities.get(jobTopic);
+ if ( topicTargets != null ) {
+ potentialTargets.addAll(topicTargets);
+ }
+ // second: category targets - handling the topic category
+ final int pos = jobTopic.lastIndexOf('/');
+ if ( pos > 0 ) {
+ final String category = jobTopic.substring(0, pos + 1).concat("*");
+ final List<InstanceDescription> categoryTargets = this.instanceCapabilities.get(category);
+ if ( categoryTargets != null ) {
+ potentialTargets.addAll(categoryTargets);
+ }
+ }
+ // third: bridged consumers
+ final List<InstanceDescription> bridgedTargets = (jobProperties != null && jobProperties.containsKey(JobImpl.PROPERTY_BRIDGED_EVENT) ? this.instanceCapabilities.get("/") : null);
+ if ( bridgedTargets != null ) {
+ potentialTargets.addAll(bridgedTargets);
+ }
+ Collections.sort(potentialTargets, COMPARATOR);
+
+ return potentialTargets;
+ }
+
+ /**
+ * Detect the target instance.
+ */
+ public String detectTarget(final String jobTopic, final Map<String, Object> jobProperties,
+ final QueueInfo queueInfo) {
+ final List<InstanceDescription> potentialTargets = this.getPotentialTargets(jobTopic, jobProperties);
+
+ if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+ if ( queueInfo.queueConfiguration.getType() == QueueConfiguration.Type.ORDERED ) {
+ // for ordered queues we always pick the first as we have to pick the same target
+ // on all instances (TODO - we could try to do some round robin of the whole queue)
+ return potentialTargets.get(0).getSlingId();
+ }
+ // TODO - this is a simple round robin which is not based on the actual load
+ // of the instances
+ Integer index = this.roundRobinMap.get(jobTopic);
+ if ( index == null ) {
+ index = 0;
+ }
+ if ( index >= potentialTargets.size() ) {
+ index = 0;
+ }
+ this.roundRobinMap.put(jobTopic, index + 1);
+ return potentialTargets.get(index).getSlingId();
+ }
+
+ return null;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Mon Apr 22 11:42:53 2013
@@ -18,16 +18,14 @@
*/
package org.apache.sling.event.impl.jobs;
-import java.io.UnsupportedEncodingException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.BitSet;
import java.util.Calendar;
import java.util.Dictionary;
+import java.util.HashMap;
import java.util.Hashtable;
+import java.util.Map;
-import org.apache.sling.event.impl.EnvironmentComponent;
-import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobUtil;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -35,174 +33,196 @@ import org.osgi.service.event.EventConst
public abstract class Utility {
- /** Allowed characters for a node name */
- private static final BitSet ALLOWED_CHARS;
+ public static final String PROPERTY_LOCK_CREATED = "lock.created";
+ public static final String PROPERTY_LOCK_CREATED_APP = "lock.created.app";
- /** Replacement characters for unallowed characters in a node name */
- private static final char REPLACEMENT_CHAR = '_';
-
- // Prepare the ALLOWED_CHARS bitset with bits indicating the unicode
- // character index of allowed characters. We deliberately only support
- // a subset of the actually allowed set of characters for nodes ...
- static {
- final String allowed = "ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz0123456789_,.-+#!?$%&()=";
- final BitSet allowedSet = new BitSet();
- for (int i = 0; i < allowed.length(); i++) {
- allowedSet.set(allowed.charAt(i));
+ /**
+ * Check the job topic.
+ * @return <code>null</code> if the topic is correct, otherwise an error description is returned
+ */
+ public static String checkJobTopic(final Object jobTopic) {
+ final String message;
+ if ( jobTopic != null ) {
+ if ( jobTopic instanceof String ) {
+ boolean topicIsCorrect = false;
+ try {
+ new Event((String)jobTopic, (Dictionary<String, Object>)null);
+ topicIsCorrect = true;
+ } catch (final IllegalArgumentException iae) {
+ // we just have to catch it
+ }
+ if ( !topicIsCorrect ) {
+ message = "Discarding job : job has an illegal job topic";
+ } else {
+ message = null;
+ }
+ } else {
+ message = "Discarding job : job topic is not of type string";
+ }
+ } else {
+ message = "Discarding job : job topic is missing";
}
- ALLOWED_CHARS = allowedSet;
+ return message;
}
+ /** Event property containing the time for job start and job finished events. */
+ public static final String PROPERTY_TIME = "time";
+
/**
- * Filter the node name for not allowed characters and replace them.
- * @param nodeName The suggested node name.
- * @return The filtered node name.
+ * Helper method for sending the notification events.
*/
- public static String filter(final String nodeName) {
- final StringBuilder sb = new StringBuilder(nodeName.length());
- char lastAdded = 0;
-
- for(int i=0; i < nodeName.length(); i++) {
- final char c = nodeName.charAt(i);
- char toAdd = c;
-
- if (!ALLOWED_CHARS.get(c)) {
- if (lastAdded == REPLACEMENT_CHAR) {
- // do not add several _ in a row
- continue;
- }
- toAdd = REPLACEMENT_CHAR;
-
- } else if(i == 0 && Character.isDigit(c)) {
- sb.append(REPLACEMENT_CHAR);
+ public static void sendNotification(final EventAdmin eventAdmin,
+ final String eventTopic,
+ final String jobTopic,
+ final String jobName,
+ final Map<String, Object> jobProperties,
+ final Long time) {
+ if ( eventAdmin != null ) {
+ // create job object
+ final Map<String, Object> jobProps;
+ if ( jobProperties == null ) {
+ jobProps = new HashMap<String, Object>();
+ } else {
+ jobProps = jobProperties;
}
-
- sb.append(toAdd);
- lastAdded = toAdd;
+ final Job job = new JobImpl(jobTopic, jobName, "<unknown>", jobProps);
+ sendNotificationInternal(eventAdmin, eventTopic, job, time);
}
+ }
- if (sb.length()==0) {
- sb.append(REPLACEMENT_CHAR);
+ /**
+ * Helper method for sending the notification events.
+ */
+ public static void sendNotification(final EventAdmin eventAdmin,
+ final String eventTopic,
+ final Job job,
+ final Long time) {
+ if ( eventAdmin != null ) {
+ // create new copy of job object
+ final Job jobCopy = new JobImpl(job.getTopic(), job.getName(), job.getId(), ((JobImpl)job).getProperties());
+ sendNotificationInternal(eventAdmin, eventTopic, jobCopy, time);
}
-
- return sb.toString();
}
/**
- * used for the md5
+ * Helper method for sending the notification events.
*/
- private static final char[] HEX_TABLE = "0123456789abcdef".toCharArray();
+ private static void sendNotificationInternal(final EventAdmin eventAdmin,
+ final String eventTopic,
+ final Job job,
+ final Long time) {
+ final Dictionary<String, Object> eventProps = new Hashtable<String, Object>();
+ // add basic job properties
+ eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_ID, job.getId());
+ eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_TOPIC, job.getTopic());
+ if ( job.getName() != null ) {
+ eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME, job.getName());
+ }
+ // copy paylod
+ for(final String name : job.getPropertyNames()) {
+ eventProps.put(name, job.getProperty(name));
+ }
+ // add timestamp
+ eventProps.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
+ // add internal time information
+ if ( time != null ) {
+ eventProps.put(PROPERTY_TIME, time);
+ }
+ // make distributable
+ eventProps.put(EventUtil.PROPERTY_DISTRIBUTE, "true");
+ // compatibility:
+ eventProps.put(JobUtil.PROPERTY_NOTIFICATION_JOB, toEvent(job));
+ eventAdmin.postEvent(new Event(eventTopic, eventProps));
+ }
/**
- * Calculate an MD5 hash of the string given using 'utf-8' encoding.
- *
- * @param data the data to encode
- * @return a hex encoded string of the md5 digested input
- */
- public static String md5(String data) {
- try {
- return digest("MD5", data.getBytes("utf-8"));
- } catch (NoSuchAlgorithmException e) {
- throw new InternalError("MD5 digest not available???");
- } catch (UnsupportedEncodingException e) {
- throw new InternalError("UTF8 digest not available???");
- }
- }
-
- /**
- * Digest the plain string using the given algorithm.
- *
- * @param algorithm The alogrithm for the digest. This algorithm must be
- * supported by the MessageDigest class.
- * @param data the data to digest with the given algorithm
- * @return The digested plain text String represented as Hex digits.
- * @throws java.security.NoSuchAlgorithmException if the desired algorithm is not supported by
- * the MessageDigest class.
- */
- private static String digest(String algorithm, byte[] data)
- throws NoSuchAlgorithmException {
- MessageDigest md = MessageDigest.getInstance(algorithm);
- byte[] digest = md.digest(data);
- StringBuilder res = new StringBuilder(digest.length * 2);
- for (int i = 0; i < digest.length; i++) {
- byte b = digest[i];
- res.append(HEX_TABLE[(b >> 4) & 15]);
- res.append(HEX_TABLE[b & 15]);
- }
- return res.toString();
- }
-
- /** Counter for jobs without an id. We don't need to sync the access. */
- private static long JOB_COUNTER = 0;
-
- /**
- * Create a unique node path (folder and name) for the job.
- */
- public static String getUniquePath(final String jobTopic, final String jobId) {
- final String convTopic = jobTopic.replace('/', '.');
- if ( jobId != null ) {
- final StringBuilder sb = new StringBuilder("identified/");
- sb.append(convTopic);
- sb.append('/');
- // we create an md from the job id - we use the first 6 bytes to
- // create sub directories
- final String md5 = md5(jobId);
- sb.append(md5.charAt(0));
- sb.append(md5.charAt(1));
- sb.append(md5.charAt(2));
- sb.append('/');
- sb.append(md5.charAt(3));
- sb.append(md5.charAt(4));
- sb.append(md5.charAt(5));
- sb.append('/');
- sb.append(filter(jobId));
- return sb.toString();
- }
- final Calendar now = Calendar.getInstance();
- // create a time based path together with the Sling ID
- final StringBuilder sb = getAnonPath(now);
- sb.append('/');
- sb.append(convTopic);
- sb.append('_');
- sb.append(JOB_COUNTER);
- JOB_COUNTER++;
- return sb.toString();
+ * Create an event from a job
+ * @param job The job
+ * @return New event object.
+ */
+ public static Event toEvent(final Job job) {
+ final Map<String, Object> eventProps = new HashMap<String, Object>();
+ eventProps.putAll(((JobImpl)job).getProperties());
+ if ( job.getName() != null ) {
+ eventProps.put(JobUtil.PROPERTY_JOB_NAME, job.getName());
+ }
+ eventProps.put(JobUtil.JOB_ID, job.getId());
+ return new Event(job.getTopic(), eventProps);
}
- public static StringBuilder getAnonPath(final Calendar now) {
- final StringBuilder sb = new StringBuilder("anon/");
- // create a time based path together with the Sling ID
- sb.append(Environment.APPLICATION_ID);
- sb.append('/');
- sb.append(now.get(Calendar.YEAR));
- sb.append('/');
- sb.append(now.get(Calendar.MONTH) + 1);
- sb.append('/');
- sb.append(now.get(Calendar.DAY_OF_MONTH));
- sb.append('/');
- sb.append(now.get(Calendar.HOUR_OF_DAY));
- sb.append('/');
- sb.append(now.get(Calendar.MINUTE));
- return sb;
+ /**
+ * Append properties to the string builder
+ */
+ private static void appendProperties(final StringBuilder sb,
+ final Map<String, Object> properties) {
+ if ( properties != null ) {
+ sb.append(", properties=");
+ boolean first = true;
+ for(final String propName : properties.keySet()) {
+ if ( propName.equals(JobUtil.JOB_ID)
+ || propName.equals(JobUtil.PROPERTY_JOB_NAME)
+ || propName.equals(JobUtil.PROPERTY_JOB_TOPIC) ) {
+ continue;
+ }
+ if ( first ) {
+ first = false;
+ } else {
+ sb.append(",");
+ }
+ sb.append(propName);
+ sb.append('=');
+ final Object value = properties.get(propName);
+ // the toString() method of Calendar is very verbose
+ // therefore we do a toString for these objects based
+ // on a date
+ if ( value instanceof Calendar ) {
+ sb.append(value.getClass().getName());
+ sb.append('(');
+ sb.append(((Calendar)value).getTime());
+ sb.append(')');
+ } else {
+ sb.append(value);
+ }
+ }
+ }
+
}
+ /**
+ * Improved toString method for a job.
+ * This method prints out the job topic and all of the properties.
+ */
+ public static String toString(final String jobTopic,
+ final String name,
+ final Map<String, Object> properties) {
+ final StringBuilder sb = new StringBuilder("Sling Job ");
+ sb.append("[topic=");
+ sb.append(jobTopic);
+ if ( name != null ) {
+ sb.append(", name=");
+ sb.append(name);
+ }
+ appendProperties(sb, properties);
- /** Event property containing the time for job start and job finished events. */
- public static final String PROPERTY_TIME = "time";
+ sb.append("]");
+ return sb.toString();
+ }
/**
- * Helper method for sending the notification events.
+ * Improved toString method for a job.
+ * This method prints out the job topic and all of the properties.
*/
- public static void sendNotification(final EnvironmentComponent environment,
- final String topic,
- final Event job,
- final Long time) {
- final EventAdmin localEA = environment.getEventAdmin();
- final Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(JobUtil.PROPERTY_NOTIFICATION_JOB, job);
- props.put(EventConstants.TIMESTAMP, System.currentTimeMillis());
- if ( time != null ) {
- props.put(PROPERTY_TIME, time);
+ public static String toString(final Job job) {
+ final StringBuilder sb = new StringBuilder("Sling Job ");
+ sb.append("[topic=");
+ sb.append(job.getTopic());
+ sb.append(", id=");
+ sb.append(job.getId());
+ if ( job.getName() != null ) {
+ sb.append(", name=");
+ sb.append(job.getName());
}
- localEA.postEvent(new Event(topic, props));
+ appendProperties(sb, ((JobImpl)job).getProperties());
+ sb.append("]");
+ return sb.toString();
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Mon Apr 22 11:42:53 2013
@@ -27,7 +27,6 @@ public abstract class ConfigurationConst
public static final String DEFAULT_TYPE = "UNORDERED";
public static final String DEFAULT_PRIORITY = "NORM";
- public static final boolean DEFAULT_RUN_LOCAL = false;
public static final int DEFAULT_RETRIES = 10;
public static final long DEFAULT_RETRY_DELAY = 2000;
public static final int DEFAULT_MAX_PARALLEL = 15;
@@ -39,6 +38,4 @@ public abstract class ConfigurationConst
public static final String PROP_RETRIES = "queue.retries";
public static final String PROP_RETRY_DELAY = "queue.retrydelay";
public static final String PROP_PRIORITY = "queue.priority";
- public static final String PROP_RUN_LOCAL = "queue.runlocal";
- public static final String PROP_APP_IDS = "queue.applicationids";
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Mon Apr 22 11:42:53 2013
@@ -30,18 +30,16 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.PropertyUnbounded;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.event.EventUtil;
-import org.apache.sling.event.impl.jobs.JobEvent;
-import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.TopicMatcher;
+import org.apache.sling.event.impl.support.TopicMatcherHelper;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.osgi.framework.Constants;
-import org.osgi.service.event.Event;
@Component(metatype=true,name="org.apache.sling.event.jobs.QueueConfiguration",
label="%queue.name", description="%queue.description",
configurationFactory=true,policy=ConfigurationPolicy.REQUIRE)
-@Service(value=InternalQueueConfiguration.class)
+@Service(value={InternalQueueConfiguration.class})
@Properties({
@Property(name=ConfigurationConstants.PROP_NAME),
@Property(name=ConfigurationConstants.PROP_TYPE,
@@ -49,7 +47,8 @@ import org.osgi.service.event.Event;
options={@PropertyOption(name="UNORDERED",value="Parallel"),
@PropertyOption(name="ORDERED",value="Ordered"),
@PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin"),
- @PropertyOption(name="IGNORE",value="Ignore")}),
+ @PropertyOption(name="PULL",value="Equal Distribution"),
+ @PropertyOption(name="DROP",value="Drop")}),
@Property(name=ConfigurationConstants.PROP_TOPICS,
unbounded=PropertyUnbounded.ARRAY),
@Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
@@ -62,11 +61,7 @@ import org.osgi.service.event.Event;
value=ConfigurationConstants.DEFAULT_PRIORITY,
options={@PropertyOption(name="NORM",value="Norm"),
@PropertyOption(name="MIN",value="Min"),
- @PropertyOption(name="MAX",value="Max")}),
- @Property(name=ConfigurationConstants.PROP_RUN_LOCAL,
- boolValue=ConfigurationConstants.DEFAULT_RUN_LOCAL),
- @Property(name=ConfigurationConstants.PROP_APP_IDS,
- unbounded=PropertyUnbounded.ARRAY)
+ @PropertyOption(name="MAX",value="Max")})
})
public class InternalQueueConfiguration
implements QueueConfiguration {
@@ -83,23 +78,17 @@ public class InternalQueueConfiguration
/** Retry delay. */
private long retryDelay;
- /** Local queue? */
- private boolean runLocal;
-
/** Thread priority. */
private JobUtil.JobPriority priority;
/** The maximum number of parallel processes (for non ordered queues) */
private int maxParallelProcesses;
- /** Optional application ids where this queue is running on. */
- private String[] applicationIds;
-
/** The ordering. */
private int serviceRanking;
/** The matchers for topics. */
- private Matcher[] matchers;
+ private TopicMatcher[] matchers;
/** The configured topics. */
private String[] topics;
@@ -130,43 +119,15 @@ public class InternalQueueConfiguration
this.name = PropertiesUtil.toString(params.get(ConfigurationConstants.PROP_NAME), null);
this.priority = JobUtil.JobPriority.valueOf(PropertiesUtil.toString(params.get(ConfigurationConstants.PROP_PRIORITY), ConfigurationConstants.DEFAULT_PRIORITY));
this.type = Type.valueOf(PropertiesUtil.toString(params.get(ConfigurationConstants.PROP_TYPE), ConfigurationConstants.DEFAULT_TYPE));
- this.runLocal = PropertiesUtil.toBoolean(params.get(ConfigurationConstants.PROP_RUN_LOCAL), ConfigurationConstants.DEFAULT_RUN_LOCAL);
this.retries = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_RETRIES), ConfigurationConstants.DEFAULT_RETRIES);
this.retryDelay = PropertiesUtil.toLong(params.get(ConfigurationConstants.PROP_RETRY_DELAY), ConfigurationConstants.DEFAULT_RETRY_DELAY);
final int maxParallel = PropertiesUtil.toInteger(params.get(ConfigurationConstants.PROP_MAX_PARALLEL), ConfigurationConstants.DEFAULT_MAX_PARALLEL);
this.maxParallelProcesses = (maxParallel == -1 ? ConfigurationConstants.NUMBER_OF_PROCESSORS : maxParallel);
- final String appIds[] = PropertiesUtil.toStringArray(params.get(ConfigurationConstants.PROP_APP_IDS));
- if ( appIds == null
- || appIds.length == 0
- || (appIds.length == 1 && (appIds[0] == null || appIds[0].length() == 0)) ) {
- this.applicationIds = null;
- } else {
- this.applicationIds = appIds;
- }
final String[] topicsParam = PropertiesUtil.toStringArray(params.get(ConfigurationConstants.PROP_TOPICS));
- if ( topicsParam == null
- || topicsParam.length == 0
- || (topicsParam.length == 1 && (topicsParam[0] == null || topicsParam[0].length() == 0))) {
- matchers = null;
+ this.matchers = TopicMatcherHelper.buildMatchers(topicsParam);
+ if ( this.matchers == null ) {
this.topics = null;
} else {
- final Matcher[] newMatchers = new Matcher[topicsParam.length];
- for(int i=0; i < topicsParam.length; i++) {
- String value = topicsParam[i];
- if ( value != null ) {
- value = value.trim();
- }
- if ( value != null && value.length() > 0 ) {
- if ( value.endsWith(".") ) {
- newMatchers[i] = new PackageMatcher(value);
- } else if ( value.endsWith("*") ) {
- newMatchers[i] = new SubPackageMatcher(value);
- } else {
- newMatchers[i] = new ClassMatcher(value);
- }
- }
- }
- matchers = newMatchers;
this.topics = topicsParam;
}
this.serviceRanking = PropertiesUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
@@ -174,63 +135,6 @@ public class InternalQueueConfiguration
this.valid = this.checkIsValid();
}
- public InternalQueueConfiguration(final Event jobEvent) {
- this.name = (String)jobEvent.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
- if ( jobEvent.getProperty(JobUtil.PROPERTY_JOB_QUEUE_ORDERED) != null ) {
- this.type = Type.ORDERED;
- this.maxParallelProcesses = 1;
- } else {
- this.type = Type.UNORDERED;
- int maxPar = ConfigurationConstants.DEFAULT_MAX_PARALLEL;
- final Object value = jobEvent.getProperty(JobUtil.PROPERTY_JOB_PARALLEL);
- if ( value != null ) {
- if ( value instanceof Boolean ) {
- final boolean result = ((Boolean)value).booleanValue();
- if ( !result ) {
- maxPar = 1;
- }
- } else if ( value instanceof Number ) {
- final int result = ((Number)value).intValue();
- if ( result > 1 ) {
- maxPar = result;
- } else {
- maxPar = 1;
- }
- } else {
- final String strValue = value.toString();
- if ( "no".equalsIgnoreCase(strValue) || "false".equalsIgnoreCase(strValue) ) {
- maxPar = 1;
- } else {
- // check if this is a number
- try {
- final int result = Integer.valueOf(strValue).intValue();
- if ( result > 1 ) {
- maxPar = result;
- } else {
- maxPar = 1;
- }
- } catch (NumberFormatException ne) {
- // we ignore this
- }
- }
- }
- }
- if ( maxPar == -1 ) {
- maxPar = ConfigurationConstants.NUMBER_OF_PROCESSORS;
- }
- this.maxParallelProcesses = maxPar;
- }
- this.priority = JobUtil.JobPriority.valueOf(ConfigurationConstants.DEFAULT_PRIORITY);
- this.runLocal = false;
- this.retries = ConfigurationConstants.DEFAULT_RETRIES;
- this.retryDelay = ConfigurationConstants.DEFAULT_RETRY_DELAY;
- this.serviceRanking = 0;
- this.applicationIds = null;
- this.matchers = null;
- this.topics = new String[] {"<Custom:" + jobEvent.getProperty(JobUtil.PROPERTY_JOB_TOPIC) + ">"};
- this.valid = true;
- }
-
/**
* Check if this configuration is valid,
* If it is invalid, it is ignored.
@@ -238,7 +142,7 @@ public class InternalQueueConfiguration
private boolean checkIsValid() {
boolean hasMatchers = false;
if ( this.matchers != null ) {
- for(final Matcher m : this.matchers ) {
+ for(final TopicMatcher m : this.matchers ) {
if ( m != null ) {
hasMatchers = true;
}
@@ -267,22 +171,21 @@ public class InternalQueueConfiguration
/**
* Check if the queue processes the event.
- * @param event The event
+ * @param topic The topic of the event
+ * @return The queue name or <code>null</code>
*/
- public boolean match(final JobEvent event) {
- final String topic = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_TOPIC);
+ public String match(final String topic) {
if ( this.matchers != null ) {
- for(final Matcher m : this.matchers ) {
+ for(final TopicMatcher m : this.matchers ) {
if ( m != null ) {
final String rep = m.match(topic);
if ( rep != null ) {
- event.queueName = this.name.replace("{0}", rep);
- return true;
+ return this.name.replace("{0}", rep);
}
}
}
}
- return false;
+ return null;
}
/**
@@ -293,37 +196,9 @@ public class InternalQueueConfiguration
}
/**
- * Checks if the event should be skipped.
- * This can happen if
- * - the queue is of type ignore
- * - the queue is bound to some application id
- * - the event is a local event generated with a different application id
- */
- public boolean isSkipped(final JobEvent event) {
- if ( this.type == Type.IGNORE ) {
- return true;
- }
- if ( this.applicationIds != null ) {
- boolean found = false;
- for(final String id : this.applicationIds) {
- if ( Environment.APPLICATION_ID.equals(id) ) {
- found = true;
- }
- }
- if ( !found ) {
- return true;
- }
- }
- if ( this.runLocal
- && !event.event.getProperty(EventUtil.PROPERTY_APPLICATION).equals(Environment.APPLICATION_ID) ) {
- return true;
- }
- return false;
- }
-
- /**
* @see org.apache.sling.event.jobs.QueueConfiguration#getRetryDelayInMs()
*/
+ @Override
public long getRetryDelayInMs() {
return this.retryDelay;
}
@@ -331,6 +206,7 @@ public class InternalQueueConfiguration
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getMaxRetries()
*/
+ @Override
public int getMaxRetries() {
return this.retries;
}
@@ -338,6 +214,7 @@ public class InternalQueueConfiguration
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getType()
*/
+ @Override
public Type getType() {
return this.type;
}
@@ -345,6 +222,7 @@ public class InternalQueueConfiguration
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getPriority()
*/
+ @Override
public JobUtil.JobPriority getPriority() {
return this.priority;
}
@@ -352,27 +230,21 @@ public class InternalQueueConfiguration
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getMaxParallel()
*/
+ @Override
public int getMaxParallel() {
return this.maxParallelProcesses;
}
- /**
- * @see org.apache.sling.event.jobs.QueueConfiguration#isLocalQueue()
- */
+ @Override
+ @Deprecated
public boolean isLocalQueue() {
- return this.runLocal;
- }
-
- /**
- * @see org.apache.sling.event.jobs.QueueConfiguration#getApplicationIds()
- */
- public String[] getApplicationIds() {
- return this.applicationIds;
+ return false;
}
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getTopics()
*/
+ @Override
public String[] getTopics() {
return this.topics;
}
@@ -380,6 +252,7 @@ public class InternalQueueConfiguration
/**
* @see org.apache.sling.event.jobs.QueueConfiguration#getRanking()
*/
+ @Override
public int getRanking() {
return this.serviceRanking;
}
@@ -389,6 +262,12 @@ public class InternalQueueConfiguration
}
@Override
+ @Deprecated
+ public String[] getApplicationIds() {
+ return null;
+ }
+
+ @Override
public String toString() {
return "Queue-Configuration(" + this.hashCode() + ") : {" +
"name=" + this.name +
@@ -397,80 +276,8 @@ public class InternalQueueConfiguration
", maxParallelProcesses=" + this.maxParallelProcesses +
", retries=" + this.retries +
", retryDelayInMs= " + this.retryDelay +
- ", applicationIds= " + (this.applicationIds == null ? "[]" : Arrays.toString(this.applicationIds)) +
", serviceRanking=" + this.serviceRanking +
", pid=" + this.pid +
", isValid=" + this.isValid() + "}";
}
-
- /**
- * Internal interface for topic matching
- */
- private static interface Matcher {
- /** Check if the topic matches and return the variable part - null if not matching. */
- String match(String topic);
- }
-
- /** Package matcher - the topic must be in the same package. */
- private static final class PackageMatcher implements Matcher {
-
- private final String packageName;
-
- public PackageMatcher(final String name) {
- // remove last char and maybe a trailing slash
- int lastPos = name.length() - 1;
- if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) {
- lastPos--;
- }
- this.packageName = name.substring(0, lastPos);
- }
-
- /**
- * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
- */
- public String match(final String topic) {
- final int pos = topic.lastIndexOf('/');
- return pos > -1 && topic.substring(0, pos).equals(packageName) ? topic.substring(pos + 1) : null;
- }
- }
-
- /** Sub package matcher - the topic must be in the same package or a sub package. */
- private static final class SubPackageMatcher implements Matcher {
- private final String packageName;
-
- public SubPackageMatcher(final String name) {
- // remove last char and maybe a trailing slash
- int lastPos = name.length() - 1;
- if ( lastPos > 0 && name.charAt(lastPos - 1) == '/' ) {
- this.packageName = name.substring(0, lastPos);
- } else {
- this.packageName = name.substring(0, lastPos) + '/';
- }
- }
-
- /**
- * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
- */
- public String match(final String topic) {
- final int pos = topic.lastIndexOf('/');
- return pos > -1 && topic.substring(0, pos + 1).startsWith(this.packageName) ? topic.substring(this.packageName.length()) : null;
- }
- }
-
- /** The topic must match exactly. */
- private static final class ClassMatcher implements Matcher {
- private final String className;
-
- public ClassMatcher(final String name) {
- this.className = name;
- }
-
- /**
- * @see org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Matcher#match(java.lang.String)
- */
- public String match(String topic) {
- return this.className.equals(topic) ? "" : null;
- }
- }
-
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is the configuration for the main queue.
+ *
+ */
+@Component(label="%job.events.name",
+ description="%job.events.description",
+ name="org.apache.sling.event.impl.jobs.DefaultJobManager",
+ metatype=true)
+@Service(value=MainQueueConfiguration.class)
+@Properties({
+ @Property(name=ConfigurationConstants.PROP_PRIORITY,
+ value=ConfigurationConstants.DEFAULT_PRIORITY,
+ options={@PropertyOption(name="NORM",value="Norm"),
+ @PropertyOption(name="MIN",value="Min"),
+ @PropertyOption(name="MAX",value="Max")}),
+ @Property(name=ConfigurationConstants.PROP_RETRIES,
+ intValue=ConfigurationConstants.DEFAULT_RETRIES),
+ @Property(name=ConfigurationConstants.PROP_RETRY_DELAY,
+ longValue=ConfigurationConstants.DEFAULT_RETRY_DELAY),
+ @Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
+ intValue=ConfigurationConstants.DEFAULT_MAX_PARALLEL),
+})
+public class MainQueueConfiguration {
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private InternalQueueConfiguration mainConfiguration;
+
+ /**
+ * Activate this component.
+ * @param props Configuration properties
+ */
+ @Activate
+ protected void activate(final Map<String, Object> props) {
+ this.update(props);
+ }
+
+ /**
+ * Configure this component.
+ * @param props Configuration properties
+ */
+ @Modified
+ protected void update(final Map<String, Object> props) {
+ // create a new dictionary with the missing info and do some sanity puts
+ final Map<String, Object> queueProps = new HashMap<String, Object>(props);
+ queueProps.put(ConfigurationConstants.PROP_TOPICS, "*");
+ queueProps.put(ConfigurationConstants.PROP_NAME, "<main queue>");
+ queueProps.put(ConfigurationConstants.PROP_TYPE, InternalQueueConfiguration.Type.UNORDERED);
+
+ // check max parallel - this should never be lower than 2!
+ final int maxParallel = PropertiesUtil.toInteger(queueProps.get(ConfigurationConstants.PROP_MAX_PARALLEL),
+ ConfigurationConstants.DEFAULT_MAX_PARALLEL);
+ if ( maxParallel < 2 ) {
+ this.logger.debug("Ignoring invalid setting of {} for {}. Setting to minimum value: 2",
+ maxParallel, ConfigurationConstants.PROP_MAX_PARALLEL);
+ queueProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 2);
+ }
+ this.mainConfiguration = InternalQueueConfiguration.fromConfiguration(queueProps);
+ }
+
+ public InternalQueueConfiguration getMainConfiguration() {
+ return this.mainConfiguration;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java Mon Apr 22 11:42:53 2013
@@ -21,18 +21,17 @@ package org.apache.sling.event.impl.jobs
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.event.impl.jobs.JobEvent;
-import org.apache.sling.event.jobs.JobUtil;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.event.impl.support.ResourceHelper;
import org.osgi.framework.BundleContext;
import org.osgi.util.tracker.ServiceTracker;
/**
- * An event handler for special job events.
- *
- * We schedule this event handler to run in the background and clean up
- * obsolete events.
+ * The queue manager manages queue configurations.
*/
@Component
@Service(value=QueueConfigurationManager.class)
@@ -47,13 +46,16 @@ public class QueueConfigurationManager {
/** Tracker count to detect changes. */
private volatile int lastTrackerCount = -1;
+ @Reference
+ private MainQueueConfiguration mainQueueConfiguration;
/**
* Activate this component.
* Create the service tracker and start it.
*/
@Activate
- protected void activate(final BundleContext bundleContext) {
+ protected void activate(final BundleContext bundleContext)
+ throws LoginException, PersistenceException {
this.configTracker = new ServiceTracker(bundleContext,
InternalQueueConfiguration.class.getName(), null);
this.configTracker.open();
@@ -101,25 +103,42 @@ public class QueueConfigurationManager {
return configurations;
}
+ public InternalQueueConfiguration getMainQueueConfiguration() {
+ return this.mainQueueConfiguration.getMainConfiguration();
+ }
+
+ public static final class QueueInfo {
+ public InternalQueueConfiguration queueConfiguration;
+ public String queueName;
+ public String targetId;
+ }
+
/**
* Find the queue configuration for the job.
* This method only returns a configuration if one matches.
*/
- public InternalQueueConfiguration getQueueConfiguration(final JobEvent event) {
+ public QueueInfo getQueueInfo(final String topic) {
final InternalQueueConfiguration[] configurations = this.getConfigurations();
- final String queueName = (String)event.event.getProperty(JobUtil.PROPERTY_JOB_QUEUE_NAME);
for(final InternalQueueConfiguration config : configurations) {
if ( config.isValid() ) {
- // check for queue name first
- if ( queueName != null && queueName.equals(config.getName()) ) {
- event.queueName = queueName;
- return config;
- }
- if ( config.match(event) ) {
- return config;
+ final String qn = config.match(topic);
+ if ( qn != null ) {
+ final QueueInfo result = new QueueInfo();
+ result.queueConfiguration = config;
+ result.queueName = ResourceHelper.filterName(qn);
+
+ return result;
}
}
}
- return null;
+ final QueueInfo result = new QueueInfo();
+ result.queueConfiguration = this.mainQueueConfiguration.getMainConfiguration();
+ result.queueName = result.queueConfiguration.getName();
+
+ return result;
+ }
+
+ public int getChangeCount() {
+ return this.configTracker.getTrackingCount();
}
}