You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/10/26 11:16:43 UTC

svn commit: r1710549 [1/2] - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/event/impl/jobs/ src/main/java/org/apache/sling/event/impl/jobs/config/ src/main/java/org/apache/sling/event/...

Author: cziegeler
Date: Mon Oct 26 10:16:42 2015
New Revision: 1710549

URL: http://svn.apache.org/viewvc?rev=1710549&view=rev
Log:
SLING-5194 : Remove all deprecated features

Removed:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobProcessor.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobStatusProvider.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/JobsIterator.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/package-info.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedJobsTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
Modified:
    sling/trunk/bundles/extensions/event/pom.xml
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Queue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/package-info.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobsImplTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilitiesTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java

Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Mon Oct 26 10:16:42 2015
@@ -29,7 +29,7 @@
 
     <artifactId>org.apache.sling.event</artifactId>
     <packaging>bundle</packaging>
-    <version>3.7.7-SNAPSHOT</version>
+    <version>4.0.0-SNAPSHOT</version>
 
     <name>Apache Sling Event Support</name>
     <description>

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobBuilderImpl.java Mon Oct 26 10:16:42 2015
@@ -56,7 +56,7 @@ public class JobBuilderImpl implements J
 
     @Override
     public Job add(final List<String> errors) {
-        return this.jobManager.addJob(this.topic, null, this.properties, errors);
+        return this.jobManager.addJob(this.topic, this.properties, errors);
     }
 
     @Override

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java Mon Oct 26 10:16:42 2015
@@ -100,9 +100,6 @@ public class JobConsumerManager {
     /** The map with the consumers, keyed by topic, sorted by service ranking. */
     private final Map<String, List<ConsumerInfo>> topicToConsumerMap = new HashMap<String, List<ConsumerInfo>>();
 
-    /** Marker if this instance supports bridged events. */
-    private boolean supportsBridgedEvents;
-
     /** ServiceRegistration for propagation. */
     private ServiceRegistration propagationService;
 
@@ -240,13 +237,6 @@ public class JobConsumerManager {
     }
 
     /**
-     * Does this instance supports bridged events?
-     */
-    public boolean supportsBridgedEvents() {
-        return supportsBridgedEvents;
-    }
-
-    /**
      * Bind a new consumer
      * @param serviceReference The service reference to the consumer.
      */
@@ -304,7 +294,6 @@ public class JobConsumerManager {
                         }
                     }
                 }
-                this.supportsBridgedEvents = this.topicToConsumerMap.containsKey("/");
                 if ( changed ) {
                     this.calculateTopics(this.propagationService != null);
                 }
@@ -354,7 +343,6 @@ public class JobConsumerManager {
                         }
                     }
                 }
-                this.supportsBridgedEvents = this.topicToConsumerMap.containsKey("/");
                 if ( changed ) {
                     this.calculateTopics(this.propagationService != null);
                 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Mon Oct 26 10:16:42 2015
@@ -29,7 +29,6 @@ import org.apache.sling.api.resource.Val
 import org.apache.sling.api.wrappers.ValueMapDecorator;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobUtil.JobPriority;
 import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.Queue;
 
@@ -41,9 +40,6 @@ public class JobImpl implements Job, Com
     /** Internal job property containing the resource path. */
     public static final String PROPERTY_RESOURCE_PATH = "slingevent:path";
 
-    /** Internal job property if this is an bridged event (event admin). */
-    public static final String PROPERTY_BRIDGED_EVENT = "slingevent:eventadmin";
-
     /** Internal job property containing optional delay override. */
     public static final String PROPERTY_DELAY_OVERRIDE = ":slingevent:delayOverride";
 
@@ -65,12 +61,8 @@ public class JobImpl implements Job, Com
 
     private final String path;
 
-    private final String name;
-
     private final String jobId;
 
-    private final boolean isBridgedEvent;
-
     private final List<Exception> readErrorList;
 
     private final long counter;
@@ -85,14 +77,11 @@ public class JobImpl implements Job, Com
      */
     @SuppressWarnings("unchecked")
     public JobImpl(final String topic,
-                   final String name,
                    final String jobId,
                    final Map<String, Object> properties) {
         this.topic = topic;
-        this.name = name;
         this.jobId = jobId;
         this.path = (String)properties.remove(PROPERTY_RESOURCE_PATH);
-        this.isBridgedEvent = properties.get(PROPERTY_BRIDGED_EVENT) != null;
         this.readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
 
         this.properties = new ValueMapDecorator(properties);
@@ -109,13 +98,6 @@ public class JobImpl implements Job, Com
     }
 
     /**
-     * Is this a bridged event?
-     */
-    public boolean isBridgedEvent() {
-        return this.isBridgedEvent;
-    }
-
-    /**
      * Did we have read errors?
      */
     public boolean hasReadErrors() {
@@ -147,14 +129,6 @@ public class JobImpl implements Job, Com
     }
 
     /**
-     * @see org.apache.sling.event.jobs.Job#getName()
-     */
-    @Override
-    public String getName() {
-        return this.name;
-    }
-
-    /**
      * @see org.apache.sling.event.jobs.Job#getId()
      */
     @Override
@@ -195,11 +169,6 @@ public class JobImpl implements Job, Com
     }
 
     @Override
-    public JobPriority getJobPriority() {
-        return JobPriority.NORM;
-    }
-
-    @Override
     public int getRetryCount() {
         return this.getProperty(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class);
     }
@@ -421,7 +390,6 @@ public class JobImpl implements Job, Com
     @Override
     public String toString() {
         return "JobImpl [properties=" + properties + ", topic=" + topic
-                + ", path=" + path + ", name=" + name + ", jobId=" + jobId
-                + ", isBridgedEvent=" + isBridgedEvent + "]";
+                + ", path=" + path + ", jobId=" + jobId + "]";
     }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Mon Oct 26 10:16:42 2015
@@ -40,7 +40,6 @@ import org.apache.sling.api.resource.Per
 import org.apache.sling.api.resource.QuerySyntaxException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
@@ -58,8 +57,6 @@ import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.Job.JobState;
 import org.apache.sling.event.jobs.JobBuilder;
 import org.apache.sling.event.jobs.JobManager;
-import org.apache.sling.event.jobs.JobUtil;
-import org.apache.sling.event.jobs.JobsIterator;
 import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.ScheduledJobInfo;
@@ -165,24 +162,6 @@ public class JobManagerImpl
     }
 
     /**
-     * @see org.apache.sling.event.jobs.JobManager#restart()
-     */
-    @Override
-    public void restart() {
-        // nothing to do as this is deprecated, let's log a warning
-        Utility.logDeprecated(logger, "Deprecated JobManager.restart() is called.");
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.JobManager#isJobProcessingEnabled()
-     */
-    @Override
-    public boolean isJobProcessingEnabled() {
-        Utility.logDeprecated(logger, "Deprecated JobManager.isJobProcessingEnabled() is called.");
-        return true;
-    }
-
-    /**
      * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
      */
     @Override
@@ -224,84 +203,6 @@ public class JobManagerImpl
         return qManager.getQueues();
     }
 
-    @Override
-    public JobsIterator queryJobs(final QueryType type, final String topic, final Map<String, Object>... templates) {
-        return this.queryJobs(type, topic, -1, templates);
-    }
-
-    @Override
-    public JobsIterator queryJobs(final QueryType type, final String topic,
-            final long limit,
-            final Map<String, Object>... templates) {
-        Utility.logDeprecated(logger, "Deprecated JobManager.queryJobs(...) is called.");
-        final Collection<Job> list = this.findJobs(type, topic, limit, templates);
-        final Iterator<Job> iter = list.iterator();
-        return new JobsIterator() {
-
-            private int index;
-
-            @Override
-            public Iterator<Event> iterator() {
-                return this;
-            }
-
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException();
-            }
-
-            @Override
-            public Event next() {
-                index++;
-                final Job job = iter.next();
-                return Utility.toEvent(job);
-            }
-
-            @Override
-            public boolean hasNext() {
-                return iter.hasNext();
-            }
-
-            @Override
-            public void skip(final long skipNum) {
-                long m = skipNum;
-                while ( m > 0 && this.hasNext() ) {
-                    this.next();
-                    m--;
-                }
-            }
-
-            @Override
-            public long getSize() {
-                return list.size();
-            }
-
-            @Override
-            public long getPosition() {
-                return index;
-            }
-        };
-    }
-
-    @Override
-    public Event findJob(final String topic, final Map<String, Object> template) {
-        Utility.logDeprecated(logger, "Deprecated JobManager.findJob(...) is called.");
-        final Job job = this.getJob(topic, template);
-        if ( job != null ) {
-            return Utility.toEvent(job);
-        }
-        return null;
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.JobManager#removeJob(java.lang.String)
-     */
-    @Override
-    public boolean removeJob(final String jobId) {
-        Utility.logDeprecated(logger, "Deprecated JobManager.removeJob(...) is called.");
-        return this.internalRemoveJobById(jobId, false);
-    }
-
     /**
      * Remove a job.
      * If the job is already in the storage area, it's removed forever.
@@ -359,67 +260,11 @@ public class JobManagerImpl
     }
 
     /**
-     * @see org.apache.sling.event.jobs.JobManager#forceRemoveJob(java.lang.String)
-     */
-    @Override
-    public void forceRemoveJob(final String jobId) {
-        Utility.logDeprecated(logger, "Deprecated JobManager.forceRemoveJob(...) is called.");
-        this.internalRemoveJobById(jobId, true);
-    }
-
-    /**
      * @see org.apache.sling.event.jobs.JobManager#addJob(java.lang.String, java.util.Map)
      */
     @Override
     public Job addJob(String topic, Map<String, Object> properties) {
-        return this.addJob(topic, null, properties, null);
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.JobManager#addJob(java.lang.String, java.lang.String, java.util.Map)
-     */
-    @Override
-    public Job addJob(final String topic, final String name, final Map<String, Object> properties) {
-        Utility.logDeprecated(logger, "Deprecated JobManager.add(String, String, Map) is called.");
-        return this.addJob(topic, name, properties, null);
-    }
-
-    /**
-     * @see org.apache.sling.event.jobs.JobManager#getJobByName(java.lang.String)
-     */
-    @Override
-    public Job getJobByName(final String name) {
-        Utility.logDeprecated(logger, "Deprecated JobManager.getJobByName(String) is called.");
-        final StringBuilder buf = new StringBuilder(64);
-
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-
-            buf.append("//element(*,");
-            buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
-            buf.append(")[@");
-            buf.append(ISO9075.encode(JobUtil.PROPERTY_JOB_NAME));
-            buf.append(" = '");
-            buf.append(name);
-            buf.append("']");
-            final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
-
-            while ( result.hasNext() ) {
-                final Resource jobResource = result.next();
-                // sanity check for the path
-                if ( this.configuration.isJob(jobResource.getPath()) ) {
-                    final JobImpl job = Utility.readJob(logger, jobResource);
-                    if ( job != null ) {
-                        return job;
-                    }
-                }
-            }
-        } catch (final QuerySyntaxException qse) {
-            logger.warn("Query syntax wrong " + buf.toString(), qse);
-        } finally {
-            resolver.close();
-        }
-        return null;
+        return this.addJob(topic, properties, null);
     }
 
     /**
@@ -675,68 +520,6 @@ public class JobManagerImpl
         return result;
     }
 
-
-
-    /**
-     * Try to get a "lock" for a resource
-     */
-    private boolean lock(final String jobTopic, final String id) {
-        if ( logger.isDebugEnabled() ) {
-            logger.debug("Trying to get lock for {}", id);
-        }
-        boolean hasLock = false;
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final String lockName = ResourceHelper.filterName(id);
-            final StringBuilder sb = new StringBuilder(this.configuration.getLocksPath());
-            sb.append('/');
-            sb.append(jobTopic.replace('/', '.'));
-            sb.append('/');
-            sb.append(lockName);
-            final String path = sb.toString();
-
-            Resource lockResource = resolver.getResource(path);
-            if ( lockResource == null ) {
-                resolver.refresh();
-                try {
-                    final Map<String, Object> props = new HashMap<String, Object>();
-                    props.put(Utility.PROPERTY_LOCK_CREATED, Calendar.getInstance());
-                    props.put(Utility.PROPERTY_LOCK_CREATED_APP, Environment.APPLICATION_ID);
-                    props.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, Utility.RESOURCE_TYPE_LOCK);
-
-                    lockResource = ResourceHelper.getOrCreateResource(resolver,
-                            path,
-                            props);
-
-                    // check if lock resource has correct name (SNS)
-                    if ( !lockResource.getName().equals(lockName) ) {
-                        if ( logger.isDebugEnabled() ) {
-                            logger.debug("Created SNS lock resource on instance {} - discarding", Environment.APPLICATION_ID);
-                        }
-                        resolver.delete(lockResource);
-                        resolver.commit();
-                    } else {
-                        final ValueMap vm = lockResource.adaptTo(ValueMap.class);
-                        if ( logger.isDebugEnabled() ) {
-                            logger.debug("Got lock resource on instance {} with {}", Environment.APPLICATION_ID, vm.get(Utility.PROPERTY_LOCK_CREATED_APP));
-                        }
-                        if ( vm.get(Utility.PROPERTY_LOCK_CREATED_APP).equals(Environment.APPLICATION_ID) ) {
-                            hasLock = true;
-                        }
-                    }
-                } catch (final PersistenceException ignore) {
-                    // ignore
-                }
-            }
-        } finally {
-            resolver.close();
-        }
-        if ( logger.isDebugEnabled() ) {
-            logger.debug("Lock for {} = {}", id, hasLock);
-        }
-        return hasLock;
-    }
-
     /**
      * Persist the job in the resource tree
      * @param jobTopic The required job topic
@@ -745,50 +528,44 @@ public class JobManagerImpl
      * @return The persisted job or <code>null</code>.
      */
     private Job addJobInteral(final String jobTopic,
-            final String jobName,
             final Map<String, Object> jobProperties,
             final List<String> errors) {
         final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(jobTopic);
-        // check for unique jobs
-        if ( jobName != null && !this.lock(jobTopic, jobName) ) {
-            logger.debug("Discarding duplicate job {}", Utility.toString(jobTopic, jobName, jobProperties));
-            return null;
-        } else {
-            final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
-            info.targetId = (caps == null ? null : caps.detectTarget(jobTopic, jobProperties, info));
 
-            if ( logger.isDebugEnabled() ) {
-                if ( info.targetId != null ) {
-                    logger.debug("Persisting job {} into queue {}, target={}", new Object[] {Utility.toString(jobTopic, jobName, jobProperties), info.queueName, info.targetId});
-                } else {
-                    logger.debug("Persisting job {} into queue {}", Utility.toString(jobTopic, jobName, jobProperties), info.queueName);
-                }
-            }
-            final ResourceResolver resolver = this.configuration.createResourceResolver();
-            try {
-                final JobImpl job = this.writeJob(resolver,
-                        jobTopic,
-                        jobName,
-                        jobProperties,
-                        info);
-                if ( info.targetId != null ) {
-                    this.configuration.getAuditLogger().debug("ASSIGN OK {} : {}",
-                            info.targetId, job.getId());
-                } else {
-                    this.configuration.getAuditLogger().debug("UNASSIGN OK : {}",
-                            job.getId());
-                }
-                return job;
-            } catch (final PersistenceException re ) {
-                // something went wrong, so let's log it
-                this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobName, jobProperties) + "'", re);
-            } finally {
-                resolver.close();
+        final TopologyCapabilities caps = this.configuration.getTopologyCapabilities();
+        info.targetId = (caps == null ? null : caps.detectTarget(jobTopic, jobProperties, info));
+
+        if ( logger.isDebugEnabled() ) {
+            if ( info.targetId != null ) {
+                logger.debug("Persisting job {} into queue {}, target={}", new Object[] {Utility.toString(jobTopic, jobProperties), info.queueName, info.targetId});
+            } else {
+                logger.debug("Persisting job {} into queue {}", Utility.toString(jobTopic, jobProperties), info.queueName);
             }
-            if ( errors != null ) {
-                errors.add("Unable to persist new job.");
+        }
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
+        try {
+            final JobImpl job = this.writeJob(resolver,
+                    jobTopic,
+                    jobProperties,
+                    info);
+            if ( info.targetId != null ) {
+                this.configuration.getAuditLogger().debug("ASSIGN OK {} : {}",
+                        info.targetId, job.getId());
+            } else {
+                this.configuration.getAuditLogger().debug("UNASSIGN OK : {}",
+                        job.getId());
             }
+            return job;
+        } catch (final PersistenceException re ) {
+            // something went wrong, so let's log it
+            this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobProperties) + "'", re);
+        } finally {
+            resolver.close();
+        }
+        if ( errors != null ) {
+            errors.add("Unable to persist new job.");
         }
+
         return null;
     }
 
@@ -801,7 +578,6 @@ public class JobManagerImpl
      */
     private JobImpl writeJob(final ResourceResolver resolver,
             final String jobTopic,
-            final String jobName,
             final Map<String, Object> jobProperties,
             final QueueInfo info)
     throws PersistenceException {
@@ -822,9 +598,6 @@ public class JobManagerImpl
 
         properties.put(ResourceHelper.PROPERTY_JOB_ID, jobId);
         properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, jobTopic);
-        if ( jobName != null ) {
-            properties.put(JobUtil.PROPERTY_JOB_NAME, jobName);
-        }
         properties.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueConfiguration.getName());
         properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
         properties.put(Job.PROPERTY_JOB_RETRIES, info.queueConfiguration.getMaxRetries());
@@ -841,7 +614,7 @@ public class JobManagerImpl
         // create path and resource
         properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_JOB);
         if ( logger.isDebugEnabled() ) {
-            logger.debug("Storing new job {} at {}", Utility.toString(jobTopic, jobName, properties), path);
+            logger.debug("Storing new job {} at {}", Utility.toString(jobTopic, properties), path);
         }
         ResourceHelper.getOrCreateResource(resolver,
                 path,
@@ -849,7 +622,7 @@ public class JobManagerImpl
 
         // update property types - priority, add path and create job
         properties.put(JobImpl.PROPERTY_RESOURCE_PATH, path);
-        return new JobImpl(jobTopic, jobName, jobId, properties);
+        return new JobImpl(jobTopic, jobId, properties);
     }
 
     /**
@@ -908,7 +681,7 @@ public class JobManagerImpl
     /**
      * Internal method to add a job
      */
-    public Job addJob(final String topic, final String name,
+    public Job addJob(final String topic,
             final Map<String, Object> properties,
             final List<String> errors) {
         final String errorMessage = Utility.checkJob(topic, properties);
@@ -917,46 +690,25 @@ public class JobManagerImpl
             if ( errors != null ) {
                 errors.add(errorMessage);
             }
-            this.configuration.getAuditLogger().debug("ADD FAILED topic={}{}{}, properties={} : {}",
+            this.configuration.getAuditLogger().debug("ADD FAILED topic={}, properties={} : {}",
                     new Object[] {topic,
-                                  name == null ? "" : ",name=",
-                                  name == null ? "" : name,
                                   properties,
                                   errorMessage});
             return null;
         }
-        if ( name != null ) {
-            Utility.logDeprecated(logger, "Job is using deprecated name feature: " + Utility.toString(topic, name, properties));
-        }
         final List<String> errorList = new ArrayList<String>();
-        Job result = this.addJobInteral(topic, name, properties, errorList);
+        Job result = this.addJobInteral(topic, properties, errorList);
         if ( errors != null ) {
             errors.addAll(errorList);
         }
         if ( result == null ) {
-            if ( name != null ) {
-                result = this.getJobByName(name);
-            }
-            if ( result == null ) {
-                this.configuration.getAuditLogger().debug("ADD FAILED topic={}{}{}, properties={} : {}",
-                        new Object[] {topic,
-                                      name == null ? "" : ",name=",
-                                      name == null ? "" : name,
-                                      properties,
-                                      errorList});
-            } else {
-                this.configuration.getAuditLogger().debug("ADD DUP topic={}{}{}, properties={} : {}",
-                        new Object[] {topic,
-                                      name == null ? "" : ",name=",
-                                      name == null ? "" : name,
-                                      properties,
-                                      result.getId()});
-            }
+            this.configuration.getAuditLogger().debug("ADD FAILED topic={}, properties={} : {}",
+                    new Object[] {topic,
+                                  properties,
+                                  errorList});
         } else {
-            this.configuration.getAuditLogger().debug("ADD OK topic={}{}{}, properties={} : {}",
+            this.configuration.getAuditLogger().debug("ADD OK topic={}, properties={} : {}",
                     new Object[] {topic,
-                                  name == null ? "" : ",name=",
-                                  name == null ? "" : name,
                                   properties,
                                   result.getId()});
         }
@@ -972,7 +724,7 @@ public class JobManagerImpl
         final JobImpl job = (JobImpl)this.getJobById(jobId);
         if ( job != null && this.configuration.isStoragePath(job.getResourcePath()) ) {
             this.internalRemoveJobById(jobId, true);
-            return this.addJob(job.getTopic(), job.getName(), job.getProperties());
+            return this.addJob(job.getTopic(), job.getProperties());
         }
         return null;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Mon Oct 26 10:16:42 2015
@@ -34,7 +34,6 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.osgi.service.event.Event;
 import org.slf4j.Logger;
@@ -103,9 +102,6 @@ public abstract class Utility {
     public static Event toEvent(final Job job) {
         final Map<String, Object> eventProps = new HashMap<String, Object>();
         eventProps.putAll(((JobImpl)job).getProperties());
-        if ( job.getName() != null ) {
-            eventProps.put(JobUtil.PROPERTY_JOB_NAME, job.getName());
-        }
         eventProps.put(ResourceHelper.PROPERTY_JOB_ID, job.getId());
         eventProps.remove(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER);
         return new Event(job.getTopic(), eventProps);
@@ -121,7 +117,6 @@ public abstract class Utility {
             boolean first = true;
             for(final String propName : properties.keySet()) {
                 if ( propName.equals(ResourceHelper.PROPERTY_JOB_ID)
-                    || propName.equals(JobUtil.PROPERTY_JOB_NAME)
                     || propName.equals(ResourceHelper.PROPERTY_JOB_TOPIC) ) {
                    continue;
                 }
@@ -153,15 +148,10 @@ public abstract class Utility {
      * This method prints out the job topic and all of the properties.
      */
     public static String toString(final String jobTopic,
-            final String name,
             final Map<String, Object> properties) {
         final StringBuilder sb = new StringBuilder("Sling Job ");
         sb.append("[topic=");
         sb.append(jobTopic);
-        if ( name != null ) {
-            sb.append(", name=");
-            sb.append(name);
-        }
         appendProperties(sb, properties);
 
         sb.append("]");
@@ -179,10 +169,6 @@ public abstract class Utility {
             sb.append(job.getTopic());
             sb.append(", id=");
             sb.append(job.getId());
-            if ( job.getName() != null ) {
-                sb.append(", name=");
-                sb.append(job.getName());
-            }
             appendProperties(sb, ((JobImpl)job).getProperties());
             sb.append("]");
             return sb.toString();
@@ -224,7 +210,6 @@ public abstract class Utility {
                         }
                     }
                     job = new JobImpl(topic,
-                            (String)jobProperties.get(JobUtil.PROPERTY_JOB_NAME),
                             jobId,
                             jobProperties);
                 } else {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Mon Oct 26 10:16:42 2015
@@ -30,10 +30,8 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.PropertyUnbounded;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.commons.osgi.PropertiesUtil;
-import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.support.TopicMatcher;
 import org.apache.sling.event.impl.support.TopicMatcherHelper;
-import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.osgi.framework.Constants;
 import org.slf4j.Logger;
@@ -221,8 +219,8 @@ public class InternalQueueConfiguration
      * If it is invalid, it is ignored.
      */
     private boolean checkIsValid() {
-        if ( type == Type.IGNORE || type == Type.DROP ) {
-            Utility.logDeprecated(logger, "Queue is using deprecated queue type. Ignoring queue " + name + " with type " + type);
+        if ( type == Type._UNSUPPORTED_1 || type == Type._UNSUPPORTED_2 ) {
+            logger.error("Queue is using unsupported queue type. Ignoring queue " + name + " with type " + type);
             return false;
         }
         if ( type == null ) {
@@ -307,14 +305,6 @@ public class InternalQueueConfiguration
     }
 
     /**
-     * @see org.apache.sling.event.jobs.QueueConfiguration#getPriority()
-     */
-    @Override
-    public JobUtil.JobPriority getPriority() {
-        return JobUtil.JobPriority.valueOf(this.priority.name());
-    }
-
-    /**
      * @see org.apache.sling.event.jobs.QueueConfiguration#getMaxParallel()
      */
     @Override
@@ -322,12 +312,6 @@ public class InternalQueueConfiguration
         return this.maxParallelProcesses;
     }
 
-    @Override
-    @Deprecated
-    public boolean isLocalQueue() {
-        return false;
-    }
-
     /**
      * @see org.apache.sling.event.jobs.QueueConfiguration#getTopics()
      */
@@ -349,12 +333,6 @@ public class InternalQueueConfiguration
     }
 
     @Override
-    @Deprecated
-    public String[] getApplicationIds() {
-        return null;
-    }
-
-    @Override
     public boolean isKeepJobs() {
         return this.keepJobs;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/TopologyCapabilities.java Mon Oct 26 10:16:42 2015
@@ -28,7 +28,6 @@ import java.util.TreeMap;
 
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.TopologyView;
-import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.jobs.QueueConfiguration;
@@ -215,7 +214,7 @@ public class TopologyCapabilities {
      * Return the potential targets (Sling IDs) sorted by ID
      * @return A list of instance descriptions. The list might be empty.
      */
-    public List<InstanceDescription> getPotentialTargets(final String jobTopic, final Map<String, Object> jobProperties) {
+    public List<InstanceDescription> getPotentialTargets(final String jobTopic) {
         // calculate potential targets
         final List<InstanceDescription> potentialTargets = new ArrayList<InstanceDescription>();
 
@@ -236,11 +235,6 @@ public class TopologyCapabilities {
                 pos = jobTopic.lastIndexOf('/', pos - 1);
             } while ( pos > 0 );
         }
-        // third: bridged consumers
-        final List<InstanceDescription> bridgedTargets = (jobProperties != null && jobProperties.containsKey(JobImpl.PROPERTY_BRIDGED_EVENT) ? this.instanceCapabilities.get("/") : null);
-        if ( bridgedTargets != null ) {
-            potentialTargets.addAll(bridgedTargets);
-        }
         Collections.sort(potentialTargets, this.instanceComparator);
 
         return potentialTargets;
@@ -251,7 +245,7 @@ public class TopologyCapabilities {
      */
     public String detectTarget(final String jobTopic, final Map<String, Object> jobProperties,
             final QueueInfo queueInfo) {
-        final List<InstanceDescription> potentialTargets = this.getPotentialTargets(jobTopic, jobProperties);
+        final List<InstanceDescription> potentialTargets = this.getPotentialTargets(jobTopic);
         logger.debug("Potential targets for {} : {}", jobTopic, potentialTargets);
         String createdOn = null;
         if ( jobProperties != null ) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/InventoryPlugin.java Mon Oct 26 10:16:42 2015
@@ -88,8 +88,8 @@ public class InventoryPlugin implements
             case ORDERED : return "Ordered";
             case TOPIC_ROUND_ROBIN : return "Topic Round Robin";
             case UNORDERED : return "Parallel";
-            case IGNORE : return "Ignore";
-            case DROP : return "Drop";
+            case _UNSUPPORTED_1 : return "????";
+            case _UNSUPPORTED_2 : return "????";
         }
         return type.toString();
     }
@@ -290,7 +290,7 @@ public class InventoryPlugin implements
         pw.printf("Max Parallel : %s%n", c.getMaxParallel());
         pw.printf("Max Retries : %s%n", c.getMaxRetries());
         pw.printf("Retry Delay : %s ms%n", c.getRetryDelayInMs());
-        pw.printf("Priority : %s%n", c.getPriority());
+        pw.printf("Priority : %s%n", c.getThreadPriority());
         pw.printf("Ranking : %s%n", c.getRanking());
 
         pw.println();
@@ -444,7 +444,7 @@ public class InventoryPlugin implements
         pw.printf("      \"maxParallel\" : %s,%n", c.getMaxParallel());
         pw.printf("      \"maxRetries\" : %s,%n", c.getMaxRetries());
         pw.printf("      \"retryDelayInMs\" : %s,%n", c.getRetryDelayInMs());
-        pw.printf("      \"priority\" : \"%s\",%n", c.getPriority());
+        pw.printf("      \"priority\" : \"%s\",%n", c.getThreadPriority());
         pw.printf("      \"ranking\" : %s%n", c.getRanking());
         pw.print("    }");
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java Mon Oct 26 10:16:42 2015
@@ -372,7 +372,7 @@ public class WebConsolePlugin extends Ht
         pw.printf("<tr><td>Max Parallel</td><td>%s</td></tr>", c.getMaxParallel());
         pw.printf("<tr><td>Max Retries</td><td>%s</td></tr>", c.getMaxRetries());
         pw.printf("<tr><td>Retry Delay</td><td>%s ms</td></tr>", c.getRetryDelayInMs());
-        pw.printf("<tr><td>Priority</td><td>%s</td></tr>", c.getPriority());
+        pw.printf("<tr><td>Priority</td><td>%s</td></tr>", c.getThreadPriority());
         pw.printf("<tr><td>Ranking</td><td>%s</td></tr>", c.getRanking());
 
         pw.println("</tbody></table>");
@@ -403,8 +403,8 @@ public class WebConsolePlugin extends Ht
             case ORDERED : return "Ordered";
             case TOPIC_ROUND_ROBIN : return "Topic Round Robin";
             case UNORDERED : return "Parallel";
-            case IGNORE : return "Ignore";
-            case DROP : return "Drop";
+            case _UNSUPPORTED_1 : return "????";
+            case _UNSUPPORTED_2 : return "????";
         }
         return type.toString();
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/notifications/NotificationUtility.java Mon Oct 26 10:16:42 2015
@@ -19,13 +19,10 @@
 package org.apache.sling.event.impl.jobs.notifications;
 
 import java.util.Dictionary;
-import java.util.HashMap;
 import java.util.Hashtable;
-import java.util.Map;
 
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.osgi.service.event.Event;
@@ -42,33 +39,11 @@ public abstract class NotificationUtilit
      */
     public static void sendNotification(final EventAdmin eventAdmin,
             final String eventTopic,
-            final String jobTopic,
-            final String jobName,
-            final Map<String, Object> jobProperties,
-            final Long time) {
-        if ( eventAdmin != null ) {
-            // create job object
-            final Map<String, Object> jobProps;
-            if ( jobProperties == null ) {
-                jobProps = new HashMap<String, Object>();
-            } else {
-                jobProps = jobProperties;
-            }
-            final Job job = new JobImpl(jobTopic, jobName, "<unknown>", jobProps);
-            sendNotificationInternal(eventAdmin, eventTopic, job, time);
-        }
-    }
-
-    /**
-     * Helper method for sending the notification events.
-     */
-    public static void sendNotification(final EventAdmin eventAdmin,
-            final String eventTopic,
             final Job job,
             final Long time) {
         if ( eventAdmin != null ) {
             // create new copy of job object
-            final Job jobCopy = new JobImpl(job.getTopic(), job.getName(), job.getId(), ((JobImpl)job).getProperties());
+            final Job jobCopy = new JobImpl(job.getTopic(), job.getId(), ((JobImpl)job).getProperties());
             sendNotificationInternal(eventAdmin, eventTopic, jobCopy, time);
         }
     }
@@ -84,9 +59,6 @@ public abstract class NotificationUtilit
         // add basic job properties
         eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, job.getId());
         eventProps.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, job.getTopic());
-        if ( job.getName() != null ) {
-            eventProps.put(JobUtil.NOTIFICATION_PROPERTY_JOB_NAME, job.getName());
-        }
         // copy payload
         for(final String name : job.getPropertyNames()) {
             eventProps.put(name, job.getProperty(name));

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Mon Oct 26 10:16:42 2015
@@ -20,9 +20,7 @@ package org.apache.sling.event.impl.jobs
 
 import java.util.Calendar;
 import java.util.Date;
-import java.util.Dictionary;
 import java.util.HashMap;
-import java.util.Hashtable;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
@@ -35,7 +33,6 @@ import org.apache.sling.api.resource.Per
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.commons.threads.ThreadPool;
-import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.EventingThreadPool;
 import org.apache.sling.event.impl.jobs.InternalJobState;
 import org.apache.sling.event.impl.jobs.JobHandler;
@@ -43,19 +40,14 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
-import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
-import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifierImpl;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
 import org.apache.sling.event.impl.support.BatchResourceRemover;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.Job.JobState;
-import org.apache.sling.event.jobs.JobProcessor;
-import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.NotificationConstants;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration.Type;
 import org.apache.sling.event.jobs.Statistics;
-import org.osgi.service.event.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,9 +61,6 @@ public class JobQueueImpl
     /** Default timeout for suspend. */
     private static final long MAX_SUSPEND_TIME = 1000 * 60 * 60; // 60 mins
 
-    /** Default number of milliseconds to wait for an ack. */
-    private static final long DEFAULT_WAIT_FOR_ACK_IN_MS = 60 * 1000; // by default we wait 60 secs
-
     /** The logger. */
     private final Logger logger;
 
@@ -267,159 +256,71 @@ public class JobQueueImpl
                 final JobImpl job = handler.getJob();
                 handler.started = System.currentTimeMillis();
 
-                if ( handler.getConsumer() != null ) {
-                    this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
-                    // sanity check for the queued property
-                    Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
-                    if ( queued == null ) {
-                        // we simply use a date of ten seconds ago
-                        queued = Calendar.getInstance();
-                        queued.setTimeInMillis(System.currentTimeMillis() - 10000);
-                    }
-                    final long queueTime = handler.started - queued.getTimeInMillis();
-                    // update statistics
-                    this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
-                    // send notification
-                    NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
-
-                    synchronized ( this.processingJobsLists ) {
-                        this.processingJobsLists.put(job.getId(), handler);
-                    }
+                this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
+                // sanity check for the queued property
+                Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
+                if ( queued == null ) {
+                    // we simply use a date of ten seconds ago
+                    queued = Calendar.getInstance();
+                    queued.setTimeInMillis(System.currentTimeMillis() - 10000);
+                }
+                final long queueTime = handler.started - queued.getTimeInMillis();
+                // update statistics
+                this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
+                // send notification
+                NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
 
-                    JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
-                    Job.JobState resultState = Job.JobState.ERROR;
-                    final JobExecutionContextImpl ctx = new JobExecutionContextImpl(handler, new JobExecutionContextImpl.ASyncHandler() {
-
-                        @Override
-                        public void finished(final JobState state) {
-                            services.jobConsumerManager.unregisterListener(job.getId());
-                            finishedJob(job.getId(), state, true);
-                            asyncCounter.decrementAndGet();
-                        }
-                    });
+                synchronized ( this.processingJobsLists ) {
+                    this.processingJobsLists.put(job.getId(), handler);
+                }
 
-                    try {
-                        synchronized ( ctx ) {
-                            result = (JobExecutionResultImpl)handler.getConsumer().process(job, ctx);
-                            if ( result == null ) { // ASYNC processing
-                                services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
-                                asyncCounter.incrementAndGet();
-                                ctx.markAsync();
-                            } else {
-                                if ( result.succeeded() ) {
-                                    resultState = Job.JobState.SUCCEEDED;
-                                } else if ( result.failed() ) {
-                                    resultState = Job.JobState.QUEUED;
-                                } else if ( result.cancelled() ) {
-                                    if ( handler.isStopped() ) {
-                                        resultState = Job.JobState.STOPPED;
-                                    } else {
-                                        resultState = Job.JobState.ERROR;
-                                    }
-                                }
-                            }
-                        }
-                    } catch (final Throwable t) { //NOSONAR
-                        logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
-                        // we don't reschedule if an exception occurs
-                        result = JobExecutionResultImpl.CANCELLED;
-                        resultState = Job.JobState.ERROR;
-                    } finally {
-                        if ( result != null ) {
-                            if ( result.getRetryDelayInMs() != null ) {
-                                job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
-                            }
-                            if ( result.getMessage() != null ) {
-                               job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
-                            }
-                            this.finishedJob(job.getId(), resultState, false);
-                        }
+                JobExecutionResultImpl result = JobExecutionResultImpl.CANCELLED;
+                Job.JobState resultState = Job.JobState.ERROR;
+                final JobExecutionContextImpl ctx = new JobExecutionContextImpl(handler, new JobExecutionContextImpl.ASyncHandler() {
+
+                    @Override
+                    public void finished(final JobState state) {
+                        services.jobConsumerManager.unregisterListener(job.getId());
+                        finishedJob(job.getId(), state, true);
+                        asyncCounter.decrementAndGet();
                     }
+                });
 
-                } else {
-                    final Event jobEvent = this.getJobEvent(handler);
-                    final JobStatusNotifierImpl notifier = (JobStatusNotifierImpl) jobEvent.getProperty(JobStatusNotifier.CONTEXT_PROPERTY_NAME);
-                    // we need async delivery, otherwise we might create a deadlock
-                    // as this method runs inside a synchronized block and the finishedJob
-                    // method as well!
-                    final long endOfAck = System.currentTimeMillis() + DEFAULT_WAIT_FOR_ACK_IN_MS;
-                    this.services.eventAdmin.postEvent(jobEvent);
-
-                    // wait for the ack
-                    synchronized ( notifier ) {
-                        while ( System.currentTimeMillis() < endOfAck && !notifier.isCalled() ) {
-                            try {
-                                notifier.wait(endOfAck - System.currentTimeMillis());
-                            } catch ( final InterruptedException ie) {
-                                Thread.currentThread().interrupt();
-                                ignoreException(ie);
+                try {
+                    synchronized ( ctx ) {
+                        result = (JobExecutionResultImpl)handler.getConsumer().process(job, ctx);
+                        if ( result == null ) { // ASYNC processing
+                            services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
+                            asyncCounter.incrementAndGet();
+                            ctx.markAsync();
+                        } else {
+                            if ( result.succeeded() ) {
+                                resultState = Job.JobState.SUCCEEDED;
+                            } else if ( result.failed() ) {
+                                resultState = Job.JobState.QUEUED;
+                            } else if ( result.cancelled() ) {
+                                if ( handler.isStopped() ) {
+                                    resultState = Job.JobState.STOPPED;
+                                } else {
+                                    resultState = Job.JobState.ERROR;
+                                }
                             }
                         }
-                        if ( !notifier.isCalled() ) {
-                            notifier.markDone();
-                        }
                     }
-                    if ( !notifier.isCalled() ) {
-                        if ( handler.reschedule() ) {
-                            this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(handler.getJob()), handler.getJob().getId());
-                            handler.getJob().retry();
-                            this.requeue(handler);
-                        }
-                    } else {
-                        if ( logger.isDebugEnabled() ) {
-                            logger.debug("Received ack for job {}", Utility.toString(job));
-                        }
-                        this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
-                        // sanity check for the queued property
-                        Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
-                        if ( queued == null ) {
-                            // we simply use a date of ten seconds ago
-                            queued = Calendar.getInstance();
-                            queued.setTimeInMillis(System.currentTimeMillis() - 10000);
-                        }
-                        final long queueTime = handler.started - queued.getTimeInMillis();
-                        // update statistics
-                        this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
-                        // send notification
-                        NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_STARTED, job, queueTime);
-
-                        synchronized ( this.processingJobsLists ) {
-                            this.processingJobsLists.put(job.getId(), handler);
+                } catch (final Throwable t) { //NOSONAR
+                    logger.error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + Utility.toString(job), t);
+                    // we don't reschedule if an exception occurs
+                    result = JobExecutionResultImpl.CANCELLED;
+                    resultState = Job.JobState.ERROR;
+                } finally {
+                    if ( result != null ) {
+                        if ( result.getRetryDelayInMs() != null ) {
+                            job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
                         }
-
-                        // check for processor
-                        final JobProcessor processor = notifier.getProcessor();
-                        if ( processor != null ) {
-                            boolean result = false;
-                            try {
-                                result = processor.process(jobEvent);
-                            } catch (Throwable t) { //NOSONAR
-                                LoggerFactory.getLogger(JobUtil.class).error("Unhandled error occured in job processor " + t.getMessage() + " while processing job " + job, t);
-                                // we don't reschedule if an exception occurs
-                                result = true;
-                            }
-                            if ( result ) {
-                                this.finishedJob(job.getId(), Job.JobState.SUCCEEDED, false);
-                            } else {
-                                this.finishedJob(job.getId(), Job.JobState.QUEUED, false);
-                            }
-                        } else {
-                            // async processing
-                            final JobExecutionContextImpl ctx = new JobExecutionContextImpl(handler, new JobExecutionContextImpl.ASyncHandler() {
-
-                                @Override
-                                public void finished(final JobState state) {
-                                    services.jobConsumerManager.unregisterListener(job.getId());
-                                    finishedJob(job.getId(), state, true);
-                                    asyncCounter.decrementAndGet();
-                                }
-                            });
-                            services.jobConsumerManager.registerListener(job.getId(), handler.getConsumer(), ctx);
-                            asyncCounter.incrementAndGet();
-                            ctx.markAsync();
-
-                            notifier.setJobExecutionContext(ctx);
+                        if ( result.getMessage() != null ) {
+                           job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
                         }
+                        this.finishedJob(job.getId(), resultState, false);
                     }
                 }
             } catch (final Exception re) {
@@ -628,30 +529,6 @@ public class JobQueueImpl
     }
 
     /**
-     * Create the real job event.
-     * This generates a new event object with the same properties, but with the
-     * {@link EventUtil#PROPERTY_JOB_TOPIC} topic.
-     * @param info The job event.
-     * @return The real job event.
-     */
-    private Event getJobEvent(final JobHandler info) {
-        final String eventTopic = info.getJob().getTopic();
-        final Dictionary<String, Object> properties = new Hashtable<String, Object>();
-        for(final String name : info.getJob().getPropertyNames()) {
-            properties.put(name, info.getJob().getProperty(name));
-        }
-
-        // put properties for finished job callback
-        properties.put(JobStatusNotifier.CONTEXT_PROPERTY_NAME, new JobStatusNotifierImpl());
-
-        // remove app id and distributable flag
-        properties.remove(EventUtil.PROPERTY_DISTRIBUTE);
-        properties.remove(EventUtil.PROPERTY_APPLICATION);
-
-        return new Event(eventTopic, properties);
-    }
-
-    /**
      * @see org.apache.sling.event.jobs.Queue#resume()
      */
     @Override
@@ -734,14 +611,6 @@ public class JobQueueImpl
     }
 
     /**
-     * @see org.apache.sling.event.jobs.Queue#clear()
-     */
-    @Override
-    public void clear() {
-        // this is a noop
-    }
-
-    /**
      * @see org.apache.sling.event.jobs.Queue#getState(java.lang.String)
      */
     @Override
@@ -782,16 +651,6 @@ public class JobQueueImpl
         return delay;
     }
 
-    /**
-     * Helper method which just logs the exception in debug mode.
-     * @param e
-     */
-    private void ignoreException(Exception e) {
-        if ( this.logger.isDebugEnabled() ) {
-            this.logger.debug("Ignored exception " + e.getMessage(), e);
-        }
-    }
-
     public boolean stopJob(final JobImpl job) {
         final JobHandler handler;
         synchronized ( this.processingJobsLists ) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java Mon Oct 26 10:16:42 2015
@@ -168,7 +168,7 @@ public class QueueJobCache {
                         final JobExecutor consumer = jobConsumerManager.getExecutor(job.getTopic());
 
                         handler = new JobHandler(job, consumer, this.configuration);
-                        if ( (consumer != null || (job.isBridgedEvent() && jobConsumerManager.supportsBridgedEvents())) ) {
+                        if ( consumer != null ) {
                             if ( !handler.startProcessing(queue) ) {
                                 statisticsManager.jobDequeued(queue.getName(), handler.getJob().getTopic());
                                 if ( logger.isDebugEnabled() ) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java Mon Oct 26 10:16:42 2015
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.event.impl.jobs.tasks;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -29,7 +28,6 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.discovery.InstanceDescription;
-import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
@@ -113,24 +111,13 @@ public class CheckTopologyTask {
 
                     // this resource should exist, but we check anyway
                     if ( jobsRoot != null ) {
-                        // check if this instance supports bridged jobs
-                        final List<InstanceDescription> bridgedTargets = caps.getPotentialTargets("/", null);
-                        boolean flag = false;
-                        for(final InstanceDescription desc : bridgedTargets) {
-                            if ( desc.isLocal() ) {
-                                flag = true;
-                                break;
-                            }
-                        }
-                        final boolean supportsBridged = flag;
-
                         final Iterator<Resource> topicIter = jobsRoot.listChildren();
                         while ( caps.isActive() && topicIter.hasNext() ) {
                             final Resource topicResource = topicIter.next();
 
                             final String topicName = topicResource.getName().replace('.', '/');
                             this.logger.debug("Checking topic {}..." , topicName);
-                            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, null);
+                            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName);
                             boolean reassign = true;
                             for(final InstanceDescription desc : potentialTargets) {
                                 if ( desc.isLocal() ) {
@@ -150,37 +137,35 @@ public class CheckTopologyTask {
                                     public boolean handle(final Resource rsrc) {
                                         try {
                                             final ValueMap vm = ResourceHelper.getValueMap(rsrc);
-                                            if ( !supportsBridged || vm.get(JobImpl.PROPERTY_BRIDGED_EVENT) == null ) {
-                                                final String targetId = caps.detectTarget(topicName, vm, info);
+                                            final String targetId = caps.detectTarget(topicName, vm, info);
 
-                                                final Map<String, Object> props = new HashMap<String, Object>(vm);
-                                                props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                                            final Map<String, Object> props = new HashMap<String, Object>(vm);
+                                            props.remove(Job.PROPERTY_JOB_STARTED_TIME);
 
-                                                final String newPath;
+                                            final String newPath;
+                                            if ( targetId != null ) {
+                                                newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+                                                props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+                                                props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                                            } else {
+                                                newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
+                                                props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                                                props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                                            }
+                                            try {
+                                                ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                                                resolver.delete(rsrc);
+                                                resolver.commit();
+                                                final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
                                                 if ( targetId != null ) {
-                                                    newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
-                                                    props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
-                                                    props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                                                    configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
                                                 } else {
-                                                    newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length());
-                                                    props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
-                                                    props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
-                                                }
-                                                try {
-                                                    ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                                                    resolver.delete(rsrc);
-                                                    resolver.commit();
-                                                    final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
-                                                    if ( targetId != null ) {
-                                                        configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
-                                                    } else {
-                                                        configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
-                                                    }
-                                                } catch ( final PersistenceException pe ) {
-                                                    logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
-                                                    resolver.refresh();
-                                                    resolver.revert();
+                                                    configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
                                                 }
+                                            } catch ( final PersistenceException pe ) {
+                                                logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe);
+                                                resolver.refresh();
+                                                resolver.revert();
                                             }
                                         } catch (final InstantiationException ie) {
                                             // something happened with the resource in the meantime
@@ -228,9 +213,6 @@ public class CheckTopologyTask {
         }
     }
 
-    /** Properties to include bridge job consumers for the quick test. */
-    private static final Map<String, Object> BRIDGED_JOB = Collections.singletonMap(JobImpl.PROPERTY_BRIDGED_EVENT, (Object)Boolean.TRUE);
-
     /**
      * Try to assign all jobs from the jobs root.
      * The jobs are stored by topic
@@ -249,7 +231,7 @@ public class CheckTopologyTask {
             logger.debug("Found topic {}", topicName);
 
             // first check if there is an instance for these topics
-            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName, BRIDGED_JOB);
+            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName);
             if ( potentialTargets != null && potentialTargets.size() > 0 ) {
                 final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager();
                 if ( qcm == null ) {
@@ -345,7 +327,7 @@ public class CheckTopologyTask {
             }
             // check for all topics
             this.reassignStaleJobs();
-
+    
             // try to assign unassigned jobs
             this.assignUnassignedJobs();
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java Mon Oct 26 10:16:42 2015
@@ -18,21 +18,15 @@
  */
 package org.apache.sling.event.impl.jobs.tasks;
 
-import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Iterator;
-import java.util.List;
 
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.api.resource.ResourceUtil;
-import org.apache.sling.api.resource.ValueMap;
-import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.TopologyCapabilities;
 import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
-import org.apache.sling.event.impl.support.BatchResourceRemover;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,88 +91,10 @@ public class CleanUpTask {
             }
         }
 
-        // lock cleanup is done every minute
-        this.lockCleanup(topologyCapabilities);
         logger.debug("Job manager maintenance: Finished #{}", this.schedulerRuns);
     }
 
     /**
-     * Clean up the locks
-     * All locks older than two minutes are removed
-     */
-    private void lockCleanup(final TopologyCapabilities caps) {
-        if ( caps != null && caps.isLeader() ) {
-            this.logger.debug("Cleaning up job resource tree: removing obsolete locks");
-            final List<Resource> candidates = new ArrayList<Resource>();
-            final ResourceResolver resolver = this.configuration.createResourceResolver();
-            if ( resolver != null ) {
-                try {
-                    final Resource parentResource = resolver.getResource(this.configuration.getLocksPath());
-                    if ( parentResource != null ) {
-                        final Calendar startDate = Calendar.getInstance();
-                        startDate.add(Calendar.MINUTE, -2);
-
-                        this.lockCleanup(caps, candidates, parentResource, startDate);
-                        final BatchResourceRemover remover = new BatchResourceRemover();
-                        boolean batchRemove = true;
-                        for(final Resource lockResource : candidates) {
-                            if ( caps.isActive() ) {
-                                try {
-                                    if ( batchRemove ) {
-                                        remover.delete(lockResource);
-                                    } else {
-                                        resolver.delete(lockResource);
-                                        resolver.commit();
-                                    }
-                                } catch ( final PersistenceException pe) {
-                                    batchRemove = false;
-                                    this.ignoreException(pe);
-                                    resolver.refresh();
-                                }
-                            } else {
-                                break;
-                            }
-                        }
-                        try {
-                            resolver.commit();
-                        } catch ( final PersistenceException pe) {
-                            this.ignoreException(pe);
-                            resolver.refresh();
-                        }
-                    }
-                } finally {
-                    resolver.close();
-                }
-            }
-        }
-    }
-
-    /**
-     * Recursive lock cleanup
-     */
-    private void lockCleanup(final TopologyCapabilities caps,
-            final List<Resource> candidates,
-            final Resource parentResource,
-            final Calendar startDate) {
-        for(final Resource childResource : parentResource.getChildren()) {
-            if ( caps.isActive() ) {
-                final ValueMap vm = ResourceUtil.getValueMap(childResource);
-                final Calendar created = vm.get(Utility.PROPERTY_LOCK_CREATED, Calendar.class);
-                if ( created != null ) {
-                    // lock resource
-                    if ( created.before(startDate) ) {
-                        candidates.add(childResource);
-                    }
-                } else {
-                    lockCleanup(caps, candidates, childResource, startDate);
-                }
-            } else {
-                break;
-            }
-        }
-    }
-
-    /**
      * Simple empty folder removes empty folders for the last ten minutes
      * starting five minutes ago.
      * If folder for minute 59 is removed, we check the hour folder as well.
@@ -339,14 +255,4 @@ public class CleanUpTask {
             resolver.close();
         }
     }
-
-    /**
-     * Helper method which just logs the exception in debug mode.
-     * @param e
-     */
-    private void ignoreException(final Exception e) {
-        if ( this.logger.isDebugEnabled() ) {
-            this.logger.debug("Ignored exception " + e.getMessage(), e);
-        }
-    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java Mon Oct 26 10:16:42 2015
@@ -29,7 +29,6 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.discovery.InstanceDescription;
-import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
@@ -82,7 +81,7 @@ public class UpgradeTask {
      * This has changed, the jobs are now stored with their real topic.
      */
     private void upgradeBridgedJobs() {
-        final String path = configuration.getLocalJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT;
+        final String path = configuration.getLocalJobsPath() + "/slingevent:eventadmin";
         final ResourceResolver resolver = configuration.createResourceResolver();
         if ( resolver != null ) {
             try {
@@ -91,7 +90,7 @@ public class UpgradeTask {
                     upgradeBridgedJobs(rootResource);
                 }
                 if ( caps.isLeader() ) {
-                    final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + '/' + JobImpl.PROPERTY_BRIDGED_EVENT);
+                    final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + "/slingevent:eventadmin");
                     if ( unassignedRoot != null ) {
                         upgradeBridgedJobs(unassignedRoot);
                     }
@@ -226,7 +225,6 @@ public class UpgradeTask {
 
             final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
 
-            properties.put(JobImpl.PROPERTY_BRIDGED_EVENT, true);
             final String topic = (String)properties.remove("slingevent:topic");
             properties.put(ResourceHelper.PROPERTY_JOB_TOPIC, topic);
 
@@ -243,7 +241,7 @@ public class UpgradeTask {
                 properties.put(Job.PROPERTY_JOB_RETRY_COUNT, 0);
             }
 
-            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets("/", null);
+            final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topic);
             String targetId = null;
             if ( potentialTargets != null && potentialTargets.size() > 0 ) {
                 final QueueConfigurationManager qcm = configuration.getQueueConfigurationManager();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1710549&r1=1710548&r2=1710549&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Mon Oct 26 10:16:42 2015
@@ -35,12 +35,9 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceUtil;
 import org.apache.sling.api.resource.ValueMap;
-import org.apache.sling.event.EventUtil;
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.config.MainQueueConfiguration;
-import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
 import org.apache.sling.event.jobs.Job;
-import org.apache.sling.event.jobs.JobUtil;
 import org.apache.sling.event.jobs.ScheduleInfo;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.osgi.service.event.EventConstants;
@@ -67,20 +64,15 @@ public abstract class ResourceHelper {
 
     public static final String PROPERTY_JOB_ID = "slingevent:eventId";
     public static final String PROPERTY_JOB_TOPIC = "event.job.topic";
+    public static final String PROPERTY_DISTRIBUTE = "event.distribute";
+    public static final String PROPERTY_APPLICATION = "event.application";
 
     /** List of ignored properties to write to the repository. */
-    @SuppressWarnings("deprecation")
     private static final String[] IGNORE_PROPERTIES = new String[] {
-        EventUtil.PROPERTY_DISTRIBUTE,
-        EventUtil.PROPERTY_APPLICATION,
+        ResourceHelper.PROPERTY_DISTRIBUTE,
+        ResourceHelper.PROPERTY_APPLICATION,
         EventConstants.EVENT_TOPIC,
         ResourceHelper.PROPERTY_JOB_ID,
-        JobUtil.PROPERTY_JOB_PARALLEL,
-        JobUtil.PROPERTY_JOB_RUN_LOCAL,
-        JobUtil.PROPERTY_JOB_QUEUE_ORDERED,
-        JobUtil.PROPERTY_NOTIFICATION_JOB,
-        Job.PROPERTY_JOB_PRIORITY,
-        JobStatusNotifier.CONTEXT_PROPERTY_NAME,
         JobImpl.PROPERTY_DELAY_OVERRIDE,
         JobConsumer.PROPERTY_JOB_ASYNC_HANDLER,
         Job.PROPERTY_JOB_PROGRESS_LOG,