You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/05/29 10:19:52 UTC
svn commit: r1682391 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
config/JobManagerConfiguration.java queues/JobQueueImpl.java
tasks/FindUnfinishedJobsTask.java
Author: cziegeler
Date: Fri May 29 08:19:51 2015
New Revision: 1682391
URL: http://svn.apache.org/r1682391
Log:
SLING-4762 : Latest job handling expects created date job property
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1682391&r1=1682390&r2=1682391&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Fri May 29 08:19:51 2015
@@ -183,6 +183,8 @@ public class JobManagerConfiguration imp
/** The topology capabilities. */
private volatile TopologyCapabilities topologyCapabilities;
+ private final AtomicBoolean firstTopologyEvent = new AtomicBoolean(true);
+
/**
* Activate this component.
* @param props Configuration properties
@@ -550,18 +552,24 @@ public class JobManagerConfiguration imp
}
}
+ TopologyEvent.Type eventType = event.getType();
+ if( this.firstTopologyEvent.compareAndSet(true, false) ) {
+ if ( eventType == Type.TOPOLOGY_CHANGED ) {
+ eventType = Type.TOPOLOGY_INIT;
+ }
+ }
synchronized ( this.listeners ) {
- if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+ if ( eventType == Type.TOPOLOGY_CHANGING ) {
this.stopProcessing();
- } else if ( event.getType() == Type.TOPOLOGY_INIT
+ } else if ( eventType == Type.TOPOLOGY_INIT
|| event.getType() == Type.TOPOLOGY_CHANGED
|| event.getType() == Type.PROPERTIES_CHANGED ) {
this.stopProcessing();
- this.startProcessing(event.getType(), new TopologyCapabilities(event.getNewView(), this), false);
+ this.startProcessing(eventType, new TopologyCapabilities(event.getNewView(), this), false);
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1682391&r1=1682390&r2=1682391&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Fri May 29 08:19:51 2015
@@ -269,7 +269,14 @@ public class JobQueueImpl
if ( handler.getConsumer() != null ) {
this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
- final long queueTime = handler.started - job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class).getTime().getTime();
+ // sanity check for the queued property
+ Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
+ if ( queued == null ) {
+ // we simply use a date of ten seconds ago
+ queued = Calendar.getInstance();
+ queued.setTimeInMillis(System.currentTimeMillis() - 10000);
+ }
+ final long queueTime = handler.started - queued.getTimeInMillis();
// update statistics
this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
// send notification
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java?rev=1682391&r1=1682390&r2=1682391&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java Fri May 29 08:19:51 2015
@@ -18,6 +18,7 @@
*/
package org.apache.sling.event.impl.jobs.tasks;
+import java.util.Calendar;
import java.util.Iterator;
import org.apache.sling.api.resource.ModifiableValueMap;
@@ -96,6 +97,7 @@ public class FindUnfinishedJobsTask {
@Override
public boolean handle(final JobImpl job) {
if ( job.getProcessingStarted() != null ) {
+ logger.debug("Found unfinished job {}", job.getId());
job.retry();
try {
final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
@@ -104,12 +106,29 @@ public class FindUnfinishedJobsTask {
final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getRetryCount());
+ if ( job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class) == null) {
+ mvm.put(JobImpl.PROPERTY_JOB_QUEUED, Calendar.getInstance());
+ }
jobResource.getResourceResolver().commit();
}
} catch ( final PersistenceException ignore) {
logger.error("Unable to update unfinished job " + job, ignore);
}
+ } else if ( job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class) == null) {
+ logger.debug("Found job without queued date {}", job.getId());
+ try {
+ final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+ // sanity check
+ if ( jobResource != null ) {
+ final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ mvm.put(JobImpl.PROPERTY_JOB_QUEUED, Calendar.getInstance());
+ jobResource.getResourceResolver().commit();
+ }
+ } catch ( final PersistenceException ignore) {
+ logger.error("Unable to update queued date for job " + job.getId(), ignore);
+ }
}
+
return true;
}
});