You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2018/07/17 08:48:14 UTC

[sling-org-apache-sling-event] branch master updated: SLING-7778 : fix race-condition between adding a scheduled job, removing it and observation

This is an automated email from the ASF dual-hosted git repository.

stefanegli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d356b4  SLING-7778 : fix race-condition between adding a scheduled job, removing it and observation
7d356b4 is described below

commit 7d356b47a135823b81402788e3a6cb3ed9b667e2
Author: Stefan Egli <st...@apache.org>
AuthorDate: Tue Jul 17 09:42:10 2018 +0200

    SLING-7778 : fix race-condition between adding a scheduled job, removing it and observation
---
 .../impl/jobs/scheduling/ScheduledJobHandler.java  |  1 +
 .../org/apache/sling/event/it/SchedulingTest.java  | 42 ++++++++++++++++++++++
 2 files changed, 43 insertions(+)

diff --git a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
index 0df2a9c..f925e4a 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
@@ -223,6 +223,7 @@ public class ScheduledJobHandler implements Runnable {
             holder.read = System.currentTimeMillis();
             holder.info = this.addOrUpdateScheduledJob(properties, h == null ? null : h.info);
 
+            this.scheduledJobs.put(key, holder);
             this.jobScheduler.scheduleJob(holder.info);
             return holder.info;
         }
diff --git a/src/test/java/org/apache/sling/event/it/SchedulingTest.java b/src/test/java/org/apache/sling/event/it/SchedulingTest.java
index 567ee70..4107a3b 100644
--- a/src/test/java/org/apache/sling/event/it/SchedulingTest.java
+++ b/src/test/java/org/apache/sling/event/it/SchedulingTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import java.io.IOException;
+import java.util.Date;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.sling.event.jobs.Job;
@@ -32,12 +34,16 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.junit.PaxExam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @RunWith(PaxExam.class)
 public class SchedulingTest extends AbstractJobHandlingTest {
 
     private static final String TOPIC = "job/scheduled/topic";
 
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
     @Override
     @Before
     public void setup() throws IOException {
@@ -84,4 +90,40 @@ public class SchedulingTest extends AbstractJobHandlingTest {
         info2.unschedule();
         assertEquals(0, this.getJobManager().getScheduledJobs().size()); // scheduled jobs
     }
+
+    @Test
+    public void schedulingLoadTest() throws Exception {
+        logger.info("schedulingLoadTest: start");
+        final AtomicInteger counter = new AtomicInteger();
+        final int NUM_ITERATIONS = 1500;
+        final String ownTopic = "random/" + UUID.randomUUID().toString();
+        this.registerJobConsumer(ownTopic, new JobConsumer() {
+
+            @Override
+            public JobResult process(final Job job) {
+                if ( job.getTopic().equals(ownTopic) ) {
+                    counter.incrementAndGet();
+                }
+                return JobResult.OK;
+            }
+
+        });
+        for(int i=0; i<NUM_ITERATIONS; i++) {
+            logger.info("schedulingLoadTest: loop-" + i);
+            this.getJobManager().createJob(ownTopic).schedule().at(new Date(System.currentTimeMillis() + 2500)).add();
+            Thread.sleep(1);
+        }
+        logger.info("schedulingLoadTest: done, letting jobs be triggered, currently at {} jobs, {} schedules", counter.get(), this.getJobManager().getScheduledJobs().size());
+        final long timeout = System.currentTimeMillis() + 30000;
+        while(System.currentTimeMillis() < timeout) {
+            if ((counter.get() == NUM_ITERATIONS) && (this.getJobManager().getScheduledJobs().size() == 0)) {
+                break;
+            }
+            logger.info("schedulingLoadTest: currently at {} jobs, {} schedules", counter.get(), getJobManager().getScheduledJobs().size());
+            Thread.sleep(100);
+        }
+        assertEquals(NUM_ITERATIONS, counter.get());
+        assertEquals(0, this.getJobManager().getScheduledJobs().size());
+        logger.info("schedulingLoadTest: end");
+    }
 }