You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2018/07/17 08:48:14 UTC
[sling-org-apache-sling-event] branch master updated: SLING-7778 :
fix race-condition between adding a scheduled job,
removing it and observation
This is an automated email from the ASF dual-hosted git repository.
stefanegli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git
The following commit(s) were added to refs/heads/master by this push:
new 7d356b4 SLING-7778 : fix race-condition between adding a scheduled job, removing it and observation
7d356b4 is described below
commit 7d356b47a135823b81402788e3a6cb3ed9b667e2
Author: Stefan Egli <st...@apache.org>
AuthorDate: Tue Jul 17 09:42:10 2018 +0200
SLING-7778 : fix race-condition between adding a scheduled job, removing it and observation
---
.../impl/jobs/scheduling/ScheduledJobHandler.java | 1 +
.../org/apache/sling/event/it/SchedulingTest.java | 42 ++++++++++++++++++++++
2 files changed, 43 insertions(+)
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
index 0df2a9c..f925e4a 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
@@ -223,6 +223,7 @@ public class ScheduledJobHandler implements Runnable {
holder.read = System.currentTimeMillis();
holder.info = this.addOrUpdateScheduledJob(properties, h == null ? null : h.info);
+ this.scheduledJobs.put(key, holder);
this.jobScheduler.scheduleJob(holder.info);
return holder.info;
}
diff --git a/src/test/java/org/apache/sling/event/it/SchedulingTest.java b/src/test/java/org/apache/sling/event/it/SchedulingTest.java
index 567ee70..4107a3b 100644
--- a/src/test/java/org/apache/sling/event/it/SchedulingTest.java
+++ b/src/test/java/org/apache/sling/event/it/SchedulingTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
+import java.util.Date;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.event.jobs.Job;
@@ -32,12 +34,16 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.junit.PaxExam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@RunWith(PaxExam.class)
public class SchedulingTest extends AbstractJobHandlingTest {
private static final String TOPIC = "job/scheduled/topic";
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
@Override
@Before
public void setup() throws IOException {
@@ -84,4 +90,40 @@ public class SchedulingTest extends AbstractJobHandlingTest {
info2.unschedule();
assertEquals(0, this.getJobManager().getScheduledJobs().size()); // scheduled jobs
}
+
+ @Test
+ public void schedulingLoadTest() throws Exception {
+ logger.info("schedulingLoadTest: start");
+ final AtomicInteger counter = new AtomicInteger();
+ final int NUM_ITERATIONS = 1500;
+ final String ownTopic = "random/" + UUID.randomUUID().toString();
+ this.registerJobConsumer(ownTopic, new JobConsumer() {
+
+ @Override
+ public JobResult process(final Job job) {
+ if ( job.getTopic().equals(ownTopic) ) {
+ counter.incrementAndGet();
+ }
+ return JobResult.OK;
+ }
+
+ });
+ for(int i=0; i<NUM_ITERATIONS; i++) {
+ logger.info("schedulingLoadTest: loop-" + i);
+ this.getJobManager().createJob(ownTopic).schedule().at(new Date(System.currentTimeMillis() + 2500)).add();
+ Thread.sleep(1);
+ }
+ logger.info("schedulingLoadTest: done, letting jobs be triggered, currently at {} jobs, {} schedules", counter.get(), this.getJobManager().getScheduledJobs().size());
+ final long timeout = System.currentTimeMillis() + 30000;
+ while(System.currentTimeMillis() < timeout) {
+ if ((counter.get() == NUM_ITERATIONS) && (this.getJobManager().getScheduledJobs().size() == 0)) {
+ break;
+ }
+ logger.info("schedulingLoadTest: currently at {} jobs, {} schedules", counter.get(), getJobManager().getScheduledJobs().size());
+ Thread.sleep(100);
+ }
+ assertEquals(NUM_ITERATIONS, counter.get());
+ assertEquals(0, this.getJobManager().getScheduledJobs().size());
+ logger.info("schedulingLoadTest: end");
+ }
}