You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/15 19:55:53 UTC
svn commit: r1632141 [1/3] - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/impl/jobs/
src/main/java/org/apache/sling/event/impl/jobs/config/
src/main/java/org/apache/sling/event/impl/jobs/queues/
src/main/java/org/ap...
Author: cziegeler
Date: Wed Oct 15 17:55:52 2014
New Revision: 1632141
URL: http://svn.apache.org/r1632141
Log:
SLING-4048 : Avoid keeping jobs in memory. Rewrite statistics, queue and topic handling (WiP)
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobsImplTest.java (with props)
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java
Modified:
sling/trunk/bundles/extensions/event/pom.xml
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (contents, props changed)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/JobTopicTraverser.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Wed Oct 15 17:55:52 2014
@@ -116,7 +116,7 @@
-Xmx2048m -XX:MaxPermSize=512m
</argLine>
<includes>
- <include>**/it/*</include>
+ <include>**/it/OrderedQueueTest*</include>
</includes>
</configuration>
</plugin>
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Wed Oct 15 17:55:52 2014
@@ -65,6 +65,8 @@ public class JobHandler {
* @return <code>true</code> if rescheduling was successful, <code>false</code> otherwise.
*/
public boolean reschedule() {
+ // update event with retry count and retries
+ this.job.retry();
return this.jobManager.reschedule(this.job);
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java Wed Oct 15 17:55:52 2014
@@ -68,6 +68,8 @@ public class JobImpl implements Job, Com
private final List<Exception> readErrorList;
+ private final long counter;
+
/**
* Create a new job instance
*
@@ -90,6 +92,8 @@ public class JobImpl implements Job, Com
this.properties = new ValueMapDecorator(properties);
this.properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, jobId);
+ final int lastPos = jobId.lastIndexOf('_');
+ this.counter = Long.valueOf(jobId.substring(lastPos + 1));
}
/**
@@ -382,7 +386,13 @@ public class JobImpl implements Job, Com
public int compareTo(final JobImpl o) {
int result = this.getCreated().compareTo(o.getCreated());
if ( result == 0 ) {
- result = this.getTopic().compareTo(o.getTopic());
+ if ( this.counter < o.counter ) {
+ result = -1;
+ } else if ( this.counter > o.counter ) {
+ result = 1;
+ } else {
+ result = this.jobId.compareTo(o.jobId);
+ }
}
return result;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Wed Oct 15 17:55:52 2014
@@ -23,13 +23,9 @@ import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -53,14 +49,9 @@ import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
-import org.apache.sling.event.impl.jobs.jmx.QueueStatusEvent;
-import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
import org.apache.sling.event.impl.jobs.queues.AbstractJobQueue;
-import org.apache.sling.event.impl.jobs.queues.OrderedJobQueue;
-import org.apache.sling.event.impl.jobs.queues.ParallelJobQueue;
-import org.apache.sling.event.impl.jobs.queues.TopicRoundRobinJobQueue;
-import org.apache.sling.event.impl.jobs.stats.StatisticsImpl;
-import org.apache.sling.event.impl.jobs.stats.TopicStatisticsImpl;
+import org.apache.sling.event.impl.jobs.queues.QueueManager;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.impl.support.ResourceHelper;
import org.apache.sling.event.impl.support.ScheduleInfoImpl;
@@ -77,7 +68,6 @@ import org.apache.sling.event.jobs.Queue
import org.apache.sling.event.jobs.ScheduledJobInfo;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.TopicStatistics;
-import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.apache.sling.event.jobs.jmx.QueuesMBean;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -93,23 +83,21 @@ import org.slf4j.LoggerFactory;
@Component(immediate=true)
@Service(value={JobManager.class, EventHandler.class, Runnable.class})
@Properties({
- @Property(name="scheduler.period", longValue=60, propertyPrivate=true),
- @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
+ @Property(name="scheduler.period", longValue=60),
+ @Property(name="scheduler.concurrent", boolValue=false),
@Property(name=EventConstants.EVENT_TOPIC,
value={SlingConstants.TOPIC_RESOURCE_ADDED,
SlingConstants.TOPIC_RESOURCE_CHANGED,
SlingConstants.TOPIC_RESOURCE_REMOVED,
- "org/apache/sling/event/notification/job/*",
Utility.TOPIC_STOP,
ResourceHelper.BUNDLE_EVENT_STARTED,
- ResourceHelper.BUNDLE_EVENT_UPDATED}, propertyPrivate=true)
+ ResourceHelper.BUNDLE_EVENT_UPDATED})
})
public class JobManagerImpl
- extends StatisticsImpl
implements JobManager, EventHandler, Runnable, TopologyAware {
/** Default logger. */
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
@Reference
private TopologyHandler topologyHandler;
@@ -136,30 +124,18 @@ public class JobManagerImpl
@Reference
private QueueConfigurationManager queueManager;
- private volatile TopologyCapabilities topologyCapabilities;
-
- private MaintenanceTask maintenanceTask;
+ @Reference
+ private StatisticsManager statisticsManager;
- private BackgroundLoader backgroundLoader;
+ @Reference QueueManager qManager;
- /** Lock object for the queues map - we don't want to sync directly on the concurrent map. */
- private final Object queuesLock = new Object();
+ private volatile TopologyCapabilities topologyCapabilities;
- /** All active queues. */
- private final Map<String, AbstractJobQueue> queues = new ConcurrentHashMap<String, AbstractJobQueue>();
+ private MaintenanceTask maintenanceTask;
/** We count the scheduler runs. */
private volatile long schedulerRuns;
- /** Current statistics. */
- private final StatisticsImpl baseStatistics = new StatisticsImpl();
-
- /** Statistics per topic. */
- private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<String, TopicStatistics>();
-
- /** Set of paths directly added as jobs - these will be ignored during observation handling. */
- private final Set<String> directlyAddedPaths = new HashSet<String>();
-
/** Job Scheduler. */
private JobSchedulerImpl jobScheduler;
@@ -171,7 +147,6 @@ public class JobManagerImpl
protected void activate(final Map<String, Object> props) throws LoginException {
this.jobScheduler = new JobSchedulerImpl(this.configuration, this.scheduler, this);
this.maintenanceTask = new MaintenanceTask(this.configuration);
- this.backgroundLoader = new BackgroundLoader(this, this.configuration);
this.topologyHandler.addListener(this);
logger.info("Apache Sling Job Manager started on instance {}", Environment.APPLICATION_ID);
@@ -187,18 +162,7 @@ public class JobManagerImpl
this.jobScheduler.deactivate();
- this.backgroundLoader.deactivate();
- this.backgroundLoader = null;
-
this.maintenanceTask = null;
- final Iterator<AbstractJobQueue> i = this.queues.values().iterator();
- while ( i.hasNext() ) {
- final AbstractJobQueue jbq = i.next();
- jbq.close();
- // update mbeans
- ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
- }
- this.queues.clear();
logger.info("Apache Sling Job Manager stopped on instance {}", Environment.APPLICATION_ID);
}
@@ -212,37 +176,6 @@ public class JobManagerImpl
this.schedulerRuns++;
logger.debug("Job manager maintenance: Starting #{}", this.schedulerRuns);
- // check for unprocessed jobs first
- logger.debug("Checking for unprocessed jobs...");
- for(final AbstractJobQueue jbq : this.queues.values() ) {
- jbq.checkForUnprocessedJobs();
- }
-
- // we only do a full clean up on every fifth run
- final boolean doFullCleanUp = (schedulerRuns % 5 == 0);
-
- if ( doFullCleanUp ) {
- // check for idle queue
- logger.debug("Checking for idle queues...");
-
- // we synchronize to avoid creating a queue which is about to be removed during cleanup
- synchronized ( queuesLock ) {
- final Iterator<Map.Entry<String, AbstractJobQueue>> i = this.queues.entrySet().iterator();
- while ( i.hasNext() ) {
- final Map.Entry<String, AbstractJobQueue> current = i.next();
- final AbstractJobQueue jbq = current.getValue();
- if ( jbq.tryToClose() ) {
- logger.debug("Removing idle job queue {}", jbq);
- // copy statistics
- this.baseStatistics.add(jbq);
- // remove
- i.remove();
- // update mbeans
- ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, jbq));
- }
- }
- }
- }
// invoke maintenance task
final MaintenanceTask task = this.maintenanceTask;
if ( task != null ) {
@@ -252,93 +185,6 @@ public class JobManagerImpl
}
/**
- * Process a new job
- * This method first searches the corresponding queue - if such a queue
- * does not exist yet, it is created and started.
- *
- * @param job The job
- */
- void process(final JobImpl job) {
- // check if we still are able to process this job
- final JobExecutor consumer = this.jobConsumerManager.getExecutor(job.getTopic());
- boolean reassign = false;
- String reassignTargetId = null;
- if ( consumer == null && (!job.isBridgedEvent() || !this.jobConsumerManager.supportsBridgedEvents())) {
- reassign = true;
- }
-
- // get the queue configuration
- final TopologyCapabilities caps = this.topologyCapabilities;
- final QueueInfo queueInfo = caps != null ? caps.getQueueInfo(job.getTopic()) : null;
- if ( queueInfo == null ) {
- return; // TODO
- }
- final InternalQueueConfiguration config = queueInfo.queueConfiguration;
-
- // Sanity check if queue configuration has changed
- if ( config.getType() == QueueConfiguration.Type.DROP ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Dropping job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
- }
- this.finishJob(job, Job.JobState.DROPPED, false, -1);
- } else if ( config.getType() == QueueConfiguration.Type.IGNORE ) {
- if ( !reassign ) {
- if ( logger.isDebugEnabled() ) {
- logger.debug("Ignoring job due to configuration of queue {} : {}", queueInfo.queueName, Utility.toString(job));
- }
- }
- } else {
-
- if ( reassign ) {
- reassignTargetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
-
- } else {
- // get or create queue
- AbstractJobQueue queue = null;
- // we synchronize to avoid creating a queue which is about to be removed during cleanup
- synchronized ( queuesLock ) {
- queue = this.queues.get(queueInfo.queueName);
- // check for reconfiguration, we really do an identity check here(!)
- if ( queue != null && queue.getConfiguration() != config ) {
- this.outdateQueue(queue);
- // we use a new queue with the configuration
- queue = null;
- }
- if ( queue == null ) {
- if ( config.getType() == QueueConfiguration.Type.ORDERED ) {
- queue = new OrderedJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin);
- } else if ( config.getType() == QueueConfiguration.Type.UNORDERED ) {
- queue = new ParallelJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin, this.scheduler);
- } else if ( config.getType() == QueueConfiguration.Type.TOPIC_ROUND_ROBIN ) {
- queue = new TopicRoundRobinJobQueue(queueInfo.queueName, config, this.jobConsumerManager, this.threadPoolManager, this.eventAdmin, this.scheduler);
- }
- if ( queue == null ) {
- // this is just a sanity check, actually we can never get here
- logger.warn("Ignoring event due to unknown queue type of queue {} : {}", queueInfo.queueName, Utility.toString(job));
- this.finishJob(job, Job.JobState.DROPPED, false, -1);
- } else {
- queues.put(queueInfo.queueName, queue);
- ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, null));
- queue.start();
- }
- }
- }
-
- // and put job
- if ( queue != null ) {
- job.updateQueueInfo(queue);
- final JobHandler handler = new JobHandler(job, this);
-
- queue.process(handler);
- }
- }
- }
- if ( reassign ) {
- this.maintenanceTask.reassignJob(job, reassignTargetId);
- }
- }
-
- /**
* This method is invoked periodically by the scheduler.
* In the default configuration every minute
* @see java.lang.Runnable#run()
@@ -358,61 +204,13 @@ public class JobManagerImpl
}
}
- private void outdateQueue(final AbstractJobQueue queue) {
- // remove the queue with the old name
- // check for main queue
- final String oldName = ResourceHelper.filterQueueName(queue.getName());
- this.queues.remove(oldName);
- // check if we can close or have to rename
- if ( queue.tryToClose() ) {
- // copy statistics
- this.baseStatistics.add(queue);
- // update mbeans
- ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(null, queue));
- } else {
- queue.outdate();
- // readd with new name
- String newName = ResourceHelper.filterName(queue.getName());
- int index = 0;
- while ( this.queues.containsKey(newName) ) {
- newName = ResourceHelper.filterName(queue.getName()) + '$' + String.valueOf(index++);
- }
- this.queues.put(newName, queue);
- // update mbeans
- ((QueuesMBeanImpl)queuesMBean).sendEvent(new QueueStatusEvent(queue, queue));
- }
- }
-
- /**
- * @see org.apache.sling.event.impl.jobs.stats.StatisticsImpl#reset()
- * Reset this statistics and all queues.
- */
- @Override
- public synchronized void reset() {
- this.baseStatistics.reset();
- for(final AbstractJobQueue jq : this.queues.values() ) {
- jq.reset();
- }
- this.topicStatistics.clear();
- }
-
/**
* @see org.apache.sling.event.jobs.JobManager#restart()
*/
@Override
public void restart() {
- // let's rename/close all queues and clear them
- synchronized ( queuesLock ) {
- final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
- for(final AbstractJobQueue queue : queues ) {
- queue.clear();
- this.outdateQueue(queue);
- }
- }
- // reset statistics
- this.reset();
- // and now load again
- this.backgroundLoader.restart();
+ // TODO reset statistics
+ // TODO reload queues?
}
/**
@@ -429,17 +227,6 @@ public class JobManagerImpl
@Override
public void handleEvent(final Event event) {
if ( SlingConstants.TOPIC_RESOURCE_ADDED.equals(event.getTopic()) ) {
- final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
- final String rt = (String) event.getProperty(SlingConstants.PROPERTY_RESOURCE_TYPE);
- if ( (rt == null || ResourceHelper.RESOURCE_TYPE_JOB.equals(rt)) &&
- this.configuration.isLocalJob(path) ) {
- synchronized ( this.directlyAddedPaths ) {
- if ( directlyAddedPaths.remove(path) ) {
- return;
- }
- }
- this.backgroundLoader.loadJob(path);
- }
this.jobScheduler.handleEvent(event);
} else if ( Utility.TOPIC_STOP.equals(event.getTopic()) ) {
if ( !EventUtil.isLocal(event) ) {
@@ -448,57 +235,20 @@ public class JobManagerImpl
}
} else if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
|| ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
- this.backgroundLoader.tryToReloadUnloadedJobs();
this.jobScheduler.handleEvent(event);
} else if ( SlingConstants.TOPIC_RESOURCE_CHANGED.equals(event.getTopic())
|| SlingConstants.TOPIC_RESOURCE_REMOVED.equals(event.getTopic()) ) {
this.jobScheduler.handleEvent(event);
- } else {
- if ( EventUtil.isLocal(event) ) {
- // job notifications
- final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
- if ( topic != null ) { // this is just a sanity check
- TopicStatisticsImpl ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
- if ( ts == null ) {
- this.topicStatistics.putIfAbsent(topic, new TopicStatisticsImpl(topic));
- ts = (TopicStatisticsImpl)this.topicStatistics.get(topic);
- }
- if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_CANCELLED) ) {
- ts.addCancelled();
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FAILED) ) {
- ts.addFailed();
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_FINISHED) ) {
- final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
- ts.addFinished(time == null ? -1 : time);
- } else if ( event.getTopic().equals(NotificationConstants.TOPIC_JOB_STARTED) ) {
- final Long time = (Long)event.getProperty(Utility.PROPERTY_TIME);
- ts.addActivated(time == null ? -1 : time);
- }
- }
- }
}
}
private void stopProcessing() {
- this.backgroundLoader.stop();
-
- // let's rename/close all queues and clear them
- synchronized ( queuesLock ) {
- final List<AbstractJobQueue> queues = new ArrayList<AbstractJobQueue>(this.queues.values());
- for(final AbstractJobQueue queue : queues ) {
- queue.clear();
- this.outdateQueue(queue);
- }
- }
-
this.topologyCapabilities = null;
}
private void startProcessing(final TopologyCapabilities caps) {
// create new capabilities and update view
this.topologyCapabilities = caps;
-
- this.backgroundLoader.start();
}
@Override
@@ -518,12 +268,7 @@ public class JobManagerImpl
*/
@Override
public synchronized Statistics getStatistics() {
- this.copyFrom(this.baseStatistics);
- for(final AbstractJobQueue jq : this.queues.values() ) {
- this.add(jq);
- }
-
- return this;
+ return this.statisticsManager.getOverallStatistics();
}
/**
@@ -531,7 +276,7 @@ public class JobManagerImpl
*/
@Override
public Iterable<TopicStatistics> getTopicStatistics() {
- return topicStatistics.values();
+ return this.statisticsManager.getTopicStatistics().values();
}
/**
@@ -539,7 +284,7 @@ public class JobManagerImpl
*/
@Override
public Queue getQueue(final String name) {
- return this.queues.get(ResourceHelper.filterQueueName(name));
+ return qManager.getQueue(ResourceHelper.filterQueueName(name));
}
/**
@@ -547,30 +292,7 @@ public class JobManagerImpl
*/
@Override
public Iterable<Queue> getQueues() {
- final Iterator<AbstractJobQueue> jqI = this.queues.values().iterator();
- return new Iterable<Queue>() {
-
- @Override
- public Iterator<Queue> iterator() {
- return new Iterator<Queue>() {
-
- @Override
- public boolean hasNext() {
- return jqI.hasNext();
- }
-
- @Override
- public Queue next() {
- return jqI.next();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
+ return qManager.getQueues();
}
@Override
@@ -1198,12 +920,7 @@ public class JobManagerImpl
jobName,
jobProperties,
info);
- if ( job != null ) {
- if ( configuration.isLocalJob(job.getResourcePath()) ) {
- this.backgroundLoader.addJob(job);
- }
- return job;
- }
+ return job;
} catch (final PersistenceException re ) {
// something went wrong, so let's log it
this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobName, jobProperties) + "'", re);
@@ -1268,9 +985,6 @@ public class JobManagerImpl
if ( logger.isDebugEnabled() ) {
logger.debug("Storing new job {} at {}", properties, path);
}
- synchronized ( this.directlyAddedPaths ) {
- this.directlyAddedPaths.add(path);
- }
ResourceHelper.getOrCreateResource(resolver,
path,
properties);
@@ -1331,6 +1045,8 @@ public class JobManagerImpl
resolver.commit();
return true;
+ } else {
+ logger.debug("No job resource found at {}", job.getResourcePath());
}
} catch ( final PersistenceException ignore ) {
this.ignoreException(ignore);
@@ -1353,10 +1069,8 @@ public class JobManagerImpl
if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
// get the queue configuration
final QueueInfo queueInfo = this.queueManager.getQueueInfo(job.getTopic());
- final AbstractJobQueue queue;
- synchronized ( queuesLock ) {
- queue = this.queues.get(queueInfo.queueName);
- }
+ final AbstractJobQueue queue = (AbstractJobQueue)this.qManager.getQueue(queueInfo.queueName);
+
boolean stopped = false;
if ( queue != null ) {
stopped = queue.stopJob(job);
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java?rev=1632141&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java Wed Oct 15 17:55:52 2014
@@ -0,0 +1,320 @@
+package org.apache.sling.event.impl.jobs;
+
+import org.slf4j.Logger;
+import org.slf4j.Marker;
+
+public class TestLogger implements Logger {
+
+ private final boolean DEBUG = false;
+
+ private final Logger logger;
+
+ @Override
+ public String getName() {
+ return logger.getName();
+ }
+
+ @Override
+ public boolean isTraceEnabled() {
+ return logger.isTraceEnabled();
+ }
+
+ @Override
+ public void trace(String msg) {
+ logger.trace(msg);
+ }
+
+ @Override
+ public void trace(String format, Object arg) {
+ logger.trace(format, arg);
+ }
+
+ @Override
+ public void trace(String format, Object arg1, Object arg2) {
+ logger.trace(format, arg1, arg2);
+ }
+
+ @Override
+ public void trace(String format, Object[] argArray) {
+ logger.trace(format, argArray);
+ }
+
+ @Override
+ public void trace(String msg, Throwable t) {
+ logger.trace(msg, t);
+ }
+
+ @Override
+ public boolean isTraceEnabled(Marker marker) {
+ return logger.isTraceEnabled(marker);
+ }
+
+ @Override
+ public void trace(Marker marker, String msg) {
+ logger.trace(marker, msg);
+ }
+
+ @Override
+ public void trace(Marker marker, String format, Object arg) {
+ logger.trace(marker, format, arg);
+ }
+
+ @Override
+ public void trace(Marker marker, String format, Object arg1, Object arg2) {
+ logger.trace(marker, format, arg1, arg2);
+ }
+
+ @Override
+ public void trace(Marker marker, String format, Object[] argArray) {
+ logger.trace(marker, format, argArray);
+ }
+
+ @Override
+ public void trace(Marker marker, String msg, Throwable t) {
+ logger.trace(marker, msg, t);
+ }
+
+ @Override
+ public boolean isDebugEnabled() {
+ return ( DEBUG ? logger.isInfoEnabled() : logger.isDebugEnabled());
+ }
+
+ @Override
+ public void debug(String msg) {
+ if ( DEBUG) logger.info(msg); else logger.debug(msg);
+ }
+
+ @Override
+ public void debug(String format, Object arg) {
+ if ( DEBUG) logger.info(format, arg);else logger.debug(format, arg);
+ }
+
+ @Override
+ public void debug(String format, Object arg1, Object arg2) {
+ if ( DEBUG) logger.info(format, arg1, arg2);else logger.debug(format, arg1, arg2);
+ }
+
+ @Override
+ public void debug(String format, Object[] argArray) {
+ if ( DEBUG) logger.info(format, argArray);else logger.debug(format,argArray);
+ }
+
+ @Override
+ public void debug(String msg, Throwable t) {
+ if ( DEBUG) logger.info(msg, t);else logger.debug(msg,t);
+ }
+
+ @Override
+ public boolean isDebugEnabled(Marker marker) {
+ return (DEBUG ? logger.isInfoEnabled(marker) : logger.isDebugEnabled(marker));
+ }
+
+ @Override
+ public void debug(Marker marker, String msg) {
+ if ( DEBUG) logger.info(marker, msg);else logger.debug(marker,msg);
+ }
+
+ @Override
+ public void debug(Marker marker, String format, Object arg) {
+ if ( DEBUG) logger.info(marker, format, arg);else logger.debug(marker,format,arg);
+ }
+
+ @Override
+ public void debug(Marker marker, String format, Object arg1, Object arg2) {
+ if ( DEBUG) logger.info(marker, format, arg1, arg2);else logger.debug(marker, format, arg1, arg2);
+ }
+
+ @Override
+ public void debug(Marker marker, String format, Object[] argArray) {
+ if ( DEBUG) logger.info(marker, format, argArray); else logger.debug(marker, format, argArray);
+ }
+
+ @Override
+ public void debug(Marker marker, String msg, Throwable t) {
+ if ( DEBUG) logger.info(marker, msg, t); else logger.debug(marker, msg, t);
+ }
+
+ @Override
+ public boolean isInfoEnabled() {
+ return logger.isInfoEnabled();
+ }
+
+ @Override
+ public void info(String msg) {
+ logger.info(msg);
+ }
+
+ @Override
+ public void info(String format, Object arg) {
+ logger.info(format, arg);
+ }
+
+ @Override
+ public void info(String format, Object arg1, Object arg2) {
+ logger.info(format, arg1, arg2);
+ }
+
+ @Override
+ public void info(String format, Object[] argArray) {
+ logger.info(format, argArray);
+ }
+
+ @Override
+ public void info(String msg, Throwable t) {
+ logger.info(msg, t);
+ }
+
+ @Override
+ public boolean isInfoEnabled(Marker marker) {
+ return logger.isInfoEnabled(marker);
+ }
+
+ @Override
+ public void info(Marker marker, String msg) {
+ logger.info(marker, msg);
+ }
+
+ @Override
+ public void info(Marker marker, String format, Object arg) {
+ logger.info(marker, format, arg);
+ }
+
+ @Override
+ public void info(Marker marker, String format, Object arg1, Object arg2) {
+ logger.info(marker, format, arg1, arg2);
+ }
+
+ @Override
+ public void info(Marker marker, String format, Object[] argArray) {
+ logger.info(marker, format, argArray);
+ }
+
+ @Override
+ public void info(Marker marker, String msg, Throwable t) {
+ logger.info(marker, msg, t);
+ }
+
+ @Override
+ public boolean isWarnEnabled() {
+ return logger.isWarnEnabled();
+ }
+
+ @Override
+ public void warn(String msg) {
+ logger.warn(msg);
+ }
+
+ @Override
+ public void warn(String format, Object arg) {
+ logger.warn(format, arg);
+ }
+
+ @Override
+ public void warn(String format, Object[] argArray) {
+ logger.warn(format, argArray);
+ }
+
+ @Override
+ public void warn(String format, Object arg1, Object arg2) {
+ logger.warn(format, arg1, arg2);
+ }
+
+ @Override
+ public void warn(String msg, Throwable t) {
+ logger.warn(msg, t);
+ }
+
+ @Override
+ public boolean isWarnEnabled(Marker marker) {
+ return logger.isWarnEnabled(marker);
+ }
+
+ @Override
+ public void warn(Marker marker, String msg) {
+ logger.warn(marker, msg);
+ }
+
+ @Override
+ public void warn(Marker marker, String format, Object arg) {
+ logger.warn(marker, format, arg);
+ }
+
+ @Override
+ public void warn(Marker marker, String format, Object arg1, Object arg2) {
+ logger.warn(marker, format, arg1, arg2);
+ }
+
+ @Override
+ public void warn(Marker marker, String format, Object[] argArray) {
+ logger.warn(marker, format, argArray);
+ }
+
+ @Override
+ public void warn(Marker marker, String msg, Throwable t) {
+ logger.warn(marker, msg, t);
+ }
+
+ @Override
+ public boolean isErrorEnabled() {
+ return logger.isErrorEnabled();
+ }
+
+ @Override
+ public void error(String msg) {
+ logger.error(msg);
+ }
+
+ @Override
+ public void error(String format, Object arg) {
+ logger.error(format, arg);
+ }
+
+ @Override
+ public void error(String format, Object arg1, Object arg2) {
+ logger.error(format, arg1, arg2);
+ }
+
+ @Override
+ public void error(String format, Object[] argArray) {
+ logger.error(format, argArray);
+ }
+
+ @Override
+ public void error(String msg, Throwable t) {
+ logger.error(msg, t);
+ }
+
+ @Override
+ public boolean isErrorEnabled(Marker marker) {
+ return logger.isErrorEnabled(marker);
+ }
+
+ @Override
+ public void error(Marker marker, String msg) {
+ logger.error(marker, msg);
+ }
+
+ @Override
+ public void error(Marker marker, String format, Object arg) {
+ logger.error(marker, format, arg);
+ }
+
+ @Override
+ public void error(Marker marker, String format, Object arg1, Object arg2) {
+ logger.error(marker, format, arg1, arg2);
+ }
+
+ @Override
+ public void error(Marker marker, String format, Object[] argArray) {
+ logger.error(marker, format, argArray);
+ }
+
+ @Override
+ public void error(Marker marker, String msg, Throwable t) {
+ logger.error(marker, msg, t);
+ }
+
+ public TestLogger(final Logger l) {
+ this.logger = l;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1632141&r1=1632140&r2=1632141&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Wed Oct 15 17:55:52 2014
@@ -46,9 +46,7 @@ import org.osgi.framework.Constants;
value=ConfigurationConstants.DEFAULT_TYPE,
options={@PropertyOption(name="UNORDERED",value="Parallel"),
@PropertyOption(name="ORDERED",value="Ordered"),
- @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin"),
- @PropertyOption(name="IGNORE",value="Ignore"),
- @PropertyOption(name="DROP",value="Drop")}),
+ @PropertyOption(name="TOPIC_ROUND_ROBIN",value="Topic Round Robin")}),
@Property(name=ConfigurationConstants.PROP_TOPICS,
unbounded=PropertyUnbounded.ARRAY),
@Property(name=ConfigurationConstants.PROP_MAX_PARALLEL,
@@ -183,6 +181,9 @@ public class InternalQueueConfiguration
return false;
}
}
+ if ( type == Type.IGNORE || type == Type.DROP ) {
+ return false;
+ }
return true;
}