You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2015/05/29 10:19:52 UTC

svn commit: r1682391 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: config/JobManagerConfiguration.java queues/JobQueueImpl.java tasks/FindUnfinishedJobsTask.java

Author: cziegeler
Date: Fri May 29 08:19:51 2015
New Revision: 1682391

URL: http://svn.apache.org/r1682391
Log:
SLING-4762 : Latest job handling expects created date job property

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1682391&r1=1682390&r2=1682391&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Fri May 29 08:19:51 2015
@@ -183,6 +183,8 @@ public class JobManagerConfiguration imp
     /** The topology capabilities. */
     private volatile TopologyCapabilities topologyCapabilities;
 
+    private final AtomicBoolean firstTopologyEvent = new AtomicBoolean(true);
+
     /**
      * Activate this component.
      * @param props Configuration properties
@@ -550,18 +552,24 @@ public class JobManagerConfiguration imp
             }
         }
 
+        TopologyEvent.Type eventType = event.getType();
+        if( this.firstTopologyEvent.compareAndSet(true, false) ) {
+            if ( eventType == Type.TOPOLOGY_CHANGED ) {
+                eventType = Type.TOPOLOGY_INIT;
+            }
+        }
         synchronized ( this.listeners ) {
 
-            if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+            if ( eventType == Type.TOPOLOGY_CHANGING ) {
                this.stopProcessing();
 
-            } else if ( event.getType() == Type.TOPOLOGY_INIT
+            } else if ( eventType == Type.TOPOLOGY_INIT
                 || event.getType() == Type.TOPOLOGY_CHANGED
                 || event.getType() == Type.PROPERTIES_CHANGED ) {
 
                 this.stopProcessing();
 
-                this.startProcessing(event.getType(), new TopologyCapabilities(event.getNewView(), this), false);
+                this.startProcessing(eventType, new TopologyCapabilities(event.getNewView(), this), false);
             }
 
         }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1682391&r1=1682390&r2=1682391&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Fri May 29 08:19:51 2015
@@ -269,7 +269,14 @@ public class JobQueueImpl
 
                 if ( handler.getConsumer() != null ) {
                     this.services.configuration.getAuditLogger().debug("START OK : {}", job.getId());
-                    final long queueTime = handler.started - job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class).getTime().getTime();
+                    // sanity check for the queued property
+                    Calendar queued = job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class);
+                    if ( queued == null ) {
+                        // we simply use a date of ten seconds ago
+                        queued = Calendar.getInstance();
+                        queued.setTimeInMillis(System.currentTimeMillis() - 10000);
+                    }
+                    final long queueTime = handler.started - queued.getTimeInMillis();
                     // update statistics
                     this.services.statisticsManager.jobStarted(this.queueName, job.getTopic(), queueTime);
                     // send notification

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java?rev=1682391&r1=1682390&r2=1682391&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java Fri May 29 08:19:51 2015
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.event.impl.jobs.tasks;
 
+import java.util.Calendar;
 import java.util.Iterator;
 
 import org.apache.sling.api.resource.ModifiableValueMap;
@@ -96,6 +97,7 @@ public class FindUnfinishedJobsTask {
             @Override
             public boolean handle(final JobImpl job) {
                 if ( job.getProcessingStarted() != null ) {
+                    logger.debug("Found unfinished job {}", job.getId());
                     job.retry();
                     try {
                         final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
@@ -104,12 +106,29 @@ public class FindUnfinishedJobsTask {
                             final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
                             mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
                             mvm.put(Job.PROPERTY_JOB_RETRY_COUNT, job.getRetryCount());
+                            if ( job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class) == null) {
+                                mvm.put(JobImpl.PROPERTY_JOB_QUEUED, Calendar.getInstance());
+                            }
                             jobResource.getResourceResolver().commit();
                         }
                     } catch ( final PersistenceException ignore) {
                         logger.error("Unable to update unfinished job " + job, ignore);
                     }
+                } else if ( job.getProperty(JobImpl.PROPERTY_JOB_QUEUED, Calendar.class) == null) {
+                    logger.debug("Found job without queued date {}", job.getId());
+                    try {
+                        final Resource jobResource = topicResource.getResourceResolver().getResource(job.getResourcePath());
+                        // sanity check
+                        if ( jobResource != null ) {
+                            final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+                            mvm.put(JobImpl.PROPERTY_JOB_QUEUED, Calendar.getInstance());
+                            jobResource.getResourceResolver().commit();
+                        }
+                    } catch ( final PersistenceException ignore) {
+                        logger.error("Unable to update queued date for job " + job.getId(), ignore);
+                    }
                 }
+
                 return true;
             }
         });