You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2017/05/04 09:07:53 UTC

svn commit: r1793757 - in /sling/trunk/bundles/extensions/event/resource: ./ src/main/java/org/apache/sling/event/impl/jobs/config/ src/main/java/org/apache/sling/event/impl/jobs/queues/ src/main/java/org/apache/sling/event/impl/jobs/scheduling/

Author: cziegeler
Date: Thu May  4 09:07:53 2017
New Revision: 1793757

URL: http://svn.apache.org/viewvc?rev=1793757&view=rev
Log:
SLING-6823 : Use Timer instead of scheduler for delayed execution

Modified:
    sling/trunk/bundles/extensions/event/resource/pom.xml
    sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
    sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
    sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
    sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
    sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java

Modified: sling/trunk/bundles/extensions/event/resource/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/pom.xml?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/resource/pom.xml Thu May  4 09:07:53 2017
@@ -82,16 +82,8 @@
                             org.apache.sling.commons.osgi;inline="org/apache/sling/commons/osgi/PropertiesUtil.*",
                             quartz;inline="org/quartz/CronExpression.*|org/quartz/ValueSet.*"
                         </Embed-Dependency>
-                        <_plugin>org.apache.felix.scrplugin.bnd.SCRDescriptorBndPlugin;destdir=${project.build.outputDirectory};</_plugin>
                     </instructions>
                 </configuration>
-                <dependencies>
-                    <dependency>
-                        <groupId>org.apache.felix</groupId>
-                        <artifactId>org.apache.felix.scr.bnd</artifactId>
-                        <version>1.7.2</version>
-                    </dependency>
-                </dependencies>
             </plugin>
            <plugin>
                 <groupId>org.apache.rat</groupId>

Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Thu May  4 09:07:53 2017
@@ -18,12 +18,13 @@
  */
 package org.apache.sling.event.impl.jobs.config;
 
-import java.sql.Date;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -494,27 +495,22 @@ public class JobManagerConfiguration {
         } else {
             // and run checker again in some seconds (if leader)
             // notify listeners afterwards
-            final Scheduler local = this.scheduler;
-            if ( local != null ) {
-                final Runnable r = new Runnable() {
-
-                    @Override
-                    public void run() {
-                        if ( newCaps == topologyCapabilities && newCaps.isActive()) {
-                            // start listeners
-                            notifiyListeners();
-                            if ( newCaps.isLeader() && newCaps.isActive() ) {
-                                final CheckTopologyTask mt = new CheckTopologyTask(JobManagerConfiguration.this);
-                                mt.fullRun();
-                            }
+            final Timer timer = new Timer();
+            timer.schedule(new TimerTask()
+            {
+
+                @Override
+                public void run() {
+                    if ( newCaps == topologyCapabilities && newCaps.isActive()) {
+                        // start listeners
+                        notifiyListeners();
+                        if ( newCaps.isLeader() && newCaps.isActive() ) {
+                            final CheckTopologyTask mt = new CheckTopologyTask(JobManagerConfiguration.this);
+                            mt.fullRun();
                         }
                     }
-                };
-                if ( !local.schedule(r, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000))) ) {
-                    // if for whatever reason scheduling doesn't work, let's run now
-                    r.run();
                 }
-            }
+            }, this.backgroundLoadDelay * 1000);
         }
         logger.debug("Job processing started");
     }

Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Thu May  4 09:07:53 2017
@@ -23,6 +23,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -80,7 +82,7 @@ public class JobQueueImpl
     private final QueueServices services;
 
     /** The map of events we're processing. */
-    private final Map<String, JobHandler> processingJobsLists = new HashMap<String, JobHandler>();
+    private final Map<String, JobHandler> processingJobsLists = new HashMap<>();
 
     private final ThreadPool threadPool;
 
@@ -676,7 +678,6 @@ public class JobQueueImpl
                 this.isSleepingUntil = fireDate.getTime();
             }
 
-            final String jobName = "Waiting:" + queueName + ":" + handler.hashCode();
             final Runnable t = new Runnable() {
                 @Override
                 public void run() {
@@ -695,10 +696,14 @@ public class JobQueueImpl
                 }
             };
             this.waitCounter.incrementAndGet();
-            if ( !services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName)) ) {
-                // if scheduling fails run the thread directly
-                t.run();
-            }
+            final Timer timer = new Timer();
+            timer.schedule(new TimerTask() {
+
+                @Override
+                public void run() {
+                    t.run();
+                }
+            }, delay);
         } else {
             // put directly into queue
             this.requeue(handler);

Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Thu May  4 09:07:53 2017
@@ -85,9 +85,6 @@ public class QueueManager
     private EventAdmin eventAdmin;
 
     @Reference
-    private Scheduler scheduler;
-
-    @Reference
     private JobConsumerManager jobConsumerManager;
 
     @Reference
@@ -135,7 +132,6 @@ public class QueueManager
         queueServices.configuration = this.configuration;
         queueServices.eventAdmin = this.eventAdmin;
         queueServices.jobConsumerManager = this.jobConsumerManager;
-        queueServices.scheduler = this.scheduler;
         queueServices.threadPoolManager = this.threadPoolManager;
         queueServices.statisticsManager = statisticsManager;
         queueServices.eventingThreadPool = this.threadPool;

Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Thu May  4 09:07:53 2017
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.event.impl.jobs.queues;
 
-import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
@@ -41,8 +40,6 @@ public class QueueServices {
 
     public ThreadPoolManager threadPoolManager;
 
-    public Scheduler scheduler;
-
     public StatisticsManager statisticsManager;
 
     public ThreadPool eventingThreadPool;

Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java Thu May  4 09:07:53 2017
@@ -88,7 +88,7 @@ public class JobSchedulerImpl
     private final ScheduledJobHandler scheduledJobHandler;
 
     /** All scheduled jobs, by scheduler name */
-    private final Map<String, ScheduledJobInfoImpl> scheduledJobs = new HashMap<String, ScheduledJobInfoImpl>();
+    private final Map<String, ScheduledJobInfoImpl> scheduledJobs = new HashMap<>();
 
     /**
      * Create the scheduler
@@ -241,7 +241,7 @@ public class JobSchedulerImpl
                             break;
                     }
                     // Create configuration for scheduled job
-                    final Map<String, Serializable> config = new HashMap<String, Serializable>();
+                    final Map<String, Serializable> config = new HashMap<>();
                     config.put(PROPERTY_READ_JOB, info);
                     config.put(PROPERTY_SCHEDULE_INDEX, index);
                     this.scheduler.schedule(this, options.name(name).config(config).canRunConcurrently(false));
@@ -305,7 +305,7 @@ public class JobSchedulerImpl
                 this.scheduledJobHandler.remove(info);
             } else {
                 // update schedule list
-                final List<ScheduleInfo> infos = new ArrayList<ScheduleInfo>();
+                final List<ScheduleInfo> infos = new ArrayList<>();
                 for(final ScheduleInfo i : info.getSchedules() ) {
                     if ( i != si ) { // no need to use equals
                         infos.add(i);
@@ -424,7 +424,7 @@ public class JobSchedulerImpl
     public Collection<ScheduledJobInfo> getScheduledJobs(final String topic,
             final long limit,
             final Map<String, Object>... templates) {
-        final List<ScheduledJobInfo> jobs = new ArrayList<ScheduledJobInfo>();
+        final List<ScheduledJobInfo> jobs = new ArrayList<>();
         long count = 0;
         synchronized ( this.scheduledJobs ) {
             for(final ScheduledJobInfoImpl job : this.scheduledJobs.values() ) {
@@ -504,7 +504,7 @@ public class JobSchedulerImpl
             final boolean isSuspended,
             final List<ScheduleInfoImpl> scheduleInfos,
             final List<String> errors) {
-        final List<String> msgs = new ArrayList<String>();
+        final List<String> msgs = new ArrayList<>();
         if ( scheduleName == null || scheduleName.length() == 0 ) {
             msgs.add("Schedule name not specified");
         }