You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2017/05/04 09:07:53 UTC
svn commit: r1793757 - in /sling/trunk/bundles/extensions/event/resource: ./
src/main/java/org/apache/sling/event/impl/jobs/config/
src/main/java/org/apache/sling/event/impl/jobs/queues/
src/main/java/org/apache/sling/event/impl/jobs/scheduling/
Author: cziegeler
Date: Thu May 4 09:07:53 2017
New Revision: 1793757
URL: http://svn.apache.org/viewvc?rev=1793757&view=rev
Log:
SLING-6823 : Use Timer instead of scheduler for delayed execution
Modified:
sling/trunk/bundles/extensions/event/resource/pom.xml
sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
Modified: sling/trunk/bundles/extensions/event/resource/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/pom.xml?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/resource/pom.xml Thu May 4 09:07:53 2017
@@ -82,16 +82,8 @@
org.apache.sling.commons.osgi;inline="org/apache/sling/commons/osgi/PropertiesUtil.*",
quartz;inline="org/quartz/CronExpression.*|org/quartz/ValueSet.*"
</Embed-Dependency>
- <_plugin>org.apache.felix.scrplugin.bnd.SCRDescriptorBndPlugin;destdir=${project.build.outputDirectory};</_plugin>
</instructions>
</configuration>
- <dependencies>
- <dependency>
- <groupId>org.apache.felix</groupId>
- <artifactId>org.apache.felix.scr.bnd</artifactId>
- <version>1.7.2</version>
- </dependency>
- </dependencies>
</plugin>
<plugin>
<groupId>org.apache.rat</groupId>
Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java Thu May 4 09:07:53 2017
@@ -18,12 +18,13 @@
*/
package org.apache.sling.event.impl.jobs.config;
-import java.sql.Date;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -494,27 +495,22 @@ public class JobManagerConfiguration {
} else {
// and run checker again in some seconds (if leader)
// notify listeners afterwards
- final Scheduler local = this.scheduler;
- if ( local != null ) {
- final Runnable r = new Runnable() {
-
- @Override
- public void run() {
- if ( newCaps == topologyCapabilities && newCaps.isActive()) {
- // start listeners
- notifiyListeners();
- if ( newCaps.isLeader() && newCaps.isActive() ) {
- final CheckTopologyTask mt = new CheckTopologyTask(JobManagerConfiguration.this);
- mt.fullRun();
- }
+ final Timer timer = new Timer();
+ timer.schedule(new TimerTask()
+ {
+
+ @Override
+ public void run() {
+ if ( newCaps == topologyCapabilities && newCaps.isActive()) {
+ // start listeners
+ notifiyListeners();
+ if ( newCaps.isLeader() && newCaps.isActive() ) {
+ final CheckTopologyTask mt = new CheckTopologyTask(JobManagerConfiguration.this);
+ mt.fullRun();
}
}
- };
- if ( !local.schedule(r, local.AT(new Date(System.currentTimeMillis() + this.backgroundLoadDelay * 1000))) ) {
- // if for whatever reason scheduling doesn't work, let's run now
- r.run();
}
- }
+ }, this.backgroundLoadDelay * 1000);
}
logger.debug("Job processing started");
}
Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java Thu May 4 09:07:53 2017
@@ -23,6 +23,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -80,7 +82,7 @@ public class JobQueueImpl
private final QueueServices services;
/** The map of events we're processing. */
- private final Map<String, JobHandler> processingJobsLists = new HashMap<String, JobHandler>();
+ private final Map<String, JobHandler> processingJobsLists = new HashMap<>();
private final ThreadPool threadPool;
@@ -676,7 +678,6 @@ public class JobQueueImpl
this.isSleepingUntil = fireDate.getTime();
}
- final String jobName = "Waiting:" + queueName + ":" + handler.hashCode();
final Runnable t = new Runnable() {
@Override
public void run() {
@@ -695,10 +696,14 @@ public class JobQueueImpl
}
};
this.waitCounter.incrementAndGet();
- if ( !services.scheduler.schedule(t, services.scheduler.AT(fireDate).name(jobName)) ) {
- // if scheduling fails run the thread directly
- t.run();
- }
+ final Timer timer = new Timer();
+ timer.schedule(new TimerTask() {
+
+ @Override
+ public void run() {
+ t.run();
+ }
+ }, delay);
} else {
// put directly into queue
this.requeue(handler);
Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Thu May 4 09:07:53 2017
@@ -85,9 +85,6 @@ public class QueueManager
private EventAdmin eventAdmin;
@Reference
- private Scheduler scheduler;
-
- @Reference
private JobConsumerManager jobConsumerManager;
@Reference
@@ -135,7 +132,6 @@ public class QueueManager
queueServices.configuration = this.configuration;
queueServices.eventAdmin = this.eventAdmin;
queueServices.jobConsumerManager = this.jobConsumerManager;
- queueServices.scheduler = this.scheduler;
queueServices.threadPoolManager = this.threadPoolManager;
queueServices.statisticsManager = statisticsManager;
queueServices.eventingThreadPool = this.threadPool;
Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueServices.java Thu May 4 09:07:53 2017
@@ -18,7 +18,6 @@
*/
package org.apache.sling.event.impl.jobs.queues;
-import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
@@ -41,8 +40,6 @@ public class QueueServices {
public ThreadPoolManager threadPoolManager;
- public Scheduler scheduler;
-
public StatisticsManager statisticsManager;
public ThreadPool eventingThreadPool;
Modified: sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java?rev=1793757&r1=1793756&r2=1793757&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/resource/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java Thu May 4 09:07:53 2017
@@ -88,7 +88,7 @@ public class JobSchedulerImpl
private final ScheduledJobHandler scheduledJobHandler;
/** All scheduled jobs, by scheduler name */
- private final Map<String, ScheduledJobInfoImpl> scheduledJobs = new HashMap<String, ScheduledJobInfoImpl>();
+ private final Map<String, ScheduledJobInfoImpl> scheduledJobs = new HashMap<>();
/**
* Create the scheduler
@@ -241,7 +241,7 @@ public class JobSchedulerImpl
break;
}
// Create configuration for scheduled job
- final Map<String, Serializable> config = new HashMap<String, Serializable>();
+ final Map<String, Serializable> config = new HashMap<>();
config.put(PROPERTY_READ_JOB, info);
config.put(PROPERTY_SCHEDULE_INDEX, index);
this.scheduler.schedule(this, options.name(name).config(config).canRunConcurrently(false));
@@ -305,7 +305,7 @@ public class JobSchedulerImpl
this.scheduledJobHandler.remove(info);
} else {
// update schedule list
- final List<ScheduleInfo> infos = new ArrayList<ScheduleInfo>();
+ final List<ScheduleInfo> infos = new ArrayList<>();
for(final ScheduleInfo i : info.getSchedules() ) {
if ( i != si ) { // no need to use equals
infos.add(i);
@@ -424,7 +424,7 @@ public class JobSchedulerImpl
public Collection<ScheduledJobInfo> getScheduledJobs(final String topic,
final long limit,
final Map<String, Object>... templates) {
- final List<ScheduledJobInfo> jobs = new ArrayList<ScheduledJobInfo>();
+ final List<ScheduledJobInfo> jobs = new ArrayList<>();
long count = 0;
synchronized ( this.scheduledJobs ) {
for(final ScheduledJobInfoImpl job : this.scheduledJobs.values() ) {
@@ -504,7 +504,7 @@ public class JobSchedulerImpl
final boolean isSuspended,
final List<ScheduleInfoImpl> scheduleInfos,
final List<String> errors) {
- final List<String> msgs = new ArrayList<String>();
+ final List<String> msgs = new ArrayList<>();
if ( scheduleName == null || scheduleName.length() == 0 ) {
msgs.add("Schedule name not specified");
}