You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/09 14:39:11 UTC
svn commit: r1530585 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/support/
test/java/org/apache/sling/event/it/
Author: cziegeler
Date: Wed Oct 9 12:39:10 2013
New Revision: 1530585
URL: http://svn.apache.org/r1530585
Log:
SLING-3139 : Provide a way to schedule jobs
Added:
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java (with props)
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java?rev=1530585&r1=1530584&r2=1530585&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java Wed Oct 9 12:39:10 2013
@@ -196,9 +196,9 @@ public class JobSchedulerImpl
final String jobTopic = (String) properties.remove(JobUtil.PROPERTY_JOB_TOPIC);
final String jobName = (String) properties.remove(JobUtil.PROPERTY_JOB_NAME);
- final String schedulerName = (String) properties.remove(ResourceHelper.PROPERTY_SCHEDULER_NAME);
- final ScheduleInfo scheduleInfo = (ScheduleInfo) properties.remove(ResourceHelper.PROPERTY_SCHEDULER_INFO);
- final boolean isSuspended = properties.remove(ResourceHelper.PROPERTY_SCHEDULER_SUSPENDED) != null;
+ final String schedulerName = (String) properties.remove(ResourceHelper.PROPERTY_SCHEDULE_NAME);
+ final ScheduleInfo scheduleInfo = (ScheduleInfo) properties.remove(ResourceHelper.PROPERTY_SCHEDULE_INFO);
+ final boolean isSuspended = properties.remove(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED) != null;
// and now schedule
final String key = ResourceHelper.filterName(schedulerName);
ScheduledJobInfoImpl info;
@@ -504,7 +504,7 @@ public class JobSchedulerImpl
final String jobTopic,
final String jobName,
final Map<String, Object> jobProperties,
- final String schedulerName,
+ final String scheduleName,
final boolean suspend,
final ScheduleInfo scheduleInfo)
throws PersistenceException {
@@ -532,16 +532,16 @@ public class JobSchedulerImpl
properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);
// put scheduler name and scheduler info
- properties.put(ResourceHelper.PROPERTY_SCHEDULER_NAME, schedulerName);
- properties.put(ResourceHelper.PROPERTY_SCHEDULER_INFO, scheduleInfo);
+ properties.put(ResourceHelper.PROPERTY_SCHEDULE_NAME, scheduleName);
+ properties.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, scheduleInfo.getSerializedString());
if ( suspend ) {
- properties.put(ResourceHelper.PROPERTY_SCHEDULER_SUSPENDED, Boolean.TRUE);
+ properties.put(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED, Boolean.TRUE);
}
// create path and resource
properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB);
- final String path = this.config.getScheduledJobsPathWithSlash() + ResourceHelper.filterName(schedulerName);
+ final String path = this.config.getScheduledJobsPathWithSlash() + ResourceHelper.filterName(scheduleName);
// update existing resource
final Resource existingInfo = resolver.getResource(path);
@@ -635,9 +635,9 @@ public class JobSchedulerImpl
if ( eventResource != null ) {
final ModifiableValueMap mvm = eventResource.adaptTo(ModifiableValueMap.class);
if ( flag ) {
- mvm.put(ResourceHelper.PROPERTY_SCHEDULER_SUSPENDED, Boolean.TRUE);
+ mvm.put(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED, Boolean.TRUE);
} else {
- mvm.remove(ResourceHelper.PROPERTY_SCHEDULER_SUSPENDED);
+ mvm.remove(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED);
}
resolver.commit();
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1530585&r1=1530584&r2=1530585&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Wed Oct 9 12:39:10 2013
@@ -56,9 +56,9 @@ public abstract class ResourceHelper {
public static final String BUNDLE_EVENT_STARTED = "org/osgi/framework/BundleEvent/STARTED";
- public static final String PROPERTY_SCHEDULER_NAME = "slingevent:schedulerName";
- public static final String PROPERTY_SCHEDULER_INFO = "slingevent:schedulerInfo";
- public static final String PROPERTY_SCHEDULER_SUSPENDED = "slingevent:schedulerSuspended";
+ public static final String PROPERTY_SCHEDULE_NAME = "slingevent:scheduleName";
+ public static final String PROPERTY_SCHEDULE_INFO = "slingevent:scheduleInfo";
+ public static final String PROPERTY_SCHEDULE_SUSPENDED = "slingevent:scheduleSuspended";
/** List of ignored properties to write to the repository. */
@SuppressWarnings("deprecation")
@@ -81,9 +81,9 @@ public abstract class ResourceHelper {
Job.PROPERTY_FINISHED_DATE,
JobImpl.PROPERTY_FINISHED_STATE,
Job.PROPERTY_RESULT_MESSAGE,
- PROPERTY_SCHEDULER_INFO,
- PROPERTY_SCHEDULER_NAME,
- PROPERTY_SCHEDULER_SUSPENDED
+ PROPERTY_SCHEDULE_INFO,
+ PROPERTY_SCHEDULE_NAME,
+ PROPERTY_SCHEDULE_SUSPENDED
};
/**
@@ -158,6 +158,14 @@ public abstract class ResourceHelper {
try {
final Map<String, Object> result = new HashMap<String, Object>(vm);
for(final Map.Entry<String, Object> entry : result.entrySet()) {
+ if ( entry.getKey().equals(PROPERTY_SCHEDULE_INFO) ) {
+ final ScheduleInfo info = ScheduleInfo.deserialize(entry.getValue().toString());
+ if ( info == null ) {
+ hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'"));
+ } else {
+ entry.setValue(info);
+ }
+ }
if ( entry.getValue() instanceof InputStream ) {
final Object value = vm.get(entry.getKey(), Serializable.class);
if ( value != null ) {
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java?rev=1530585&r1=1530584&r2=1530585&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java Wed Oct 9 12:39:10 2013
@@ -18,7 +18,6 @@
*/
package org.apache.sling.event.impl.support;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Date;
@@ -29,7 +28,7 @@ public class ScheduleInfo implements Ser
private static final long serialVersionUID = 1L;
/** Serialization version. */
- private static final int VERSION = 1;
+ private static final String VERSION = "1";
public static ScheduleInfo HOURLY(final int minutes) {
return new ScheduleInfo(ScheduleType.HOURLY, -1, -1, minutes, null);
@@ -47,15 +46,15 @@ public class ScheduleInfo implements Ser
return new ScheduleInfo(ScheduleType.DAILY, -1, hour, minute, null);
}
- private ScheduleType scheduleType;
+ private final ScheduleType scheduleType;
- private int dayOfWeek;
+ private final int dayOfWeek;
- private int hourOfDay;
+ private final int hourOfDay;
- private int minuteOfHour;
+ private final int minuteOfHour;
- private Date at;
+ private final Date at;
private ScheduleInfo(final ScheduleType scheduleType,
final int dayOfWeek,
@@ -69,39 +68,39 @@ public class ScheduleInfo implements Ser
this.at = at;
}
- /**
- * Serialize the object
- * - write version id
- * - serialize each entry
- * @param out Object output stream
- * @throws IOException
- */
- private void writeObject(final java.io.ObjectOutputStream out)
- throws IOException {
- out.writeInt(VERSION);
- out.writeObject(this.scheduleType.name());
- out.writeInt(this.dayOfWeek);
- out.writeInt(this.hourOfDay);
- out.writeInt(this.minuteOfHour);
- out.writeObject(this.at);
- }
-
- /**
- * Deserialize the object
- * - read version id
- * - deserialize each entry
- */
- private void readObject(final java.io.ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- final int version = in.readInt();
- if ( version < 1 || version > VERSION ) {
- throw new ClassNotFoundException(this.getClass().getName());
+ public static ScheduleInfo deserialize(final String s) {
+ final String[] parts = s.split(":");
+ if ( parts.length == 6 && parts[0].equals(VERSION) ) {
+ try {
+ return new ScheduleInfo(ScheduleType.valueOf(parts[1]),
+ Integer.parseInt(parts[2]),
+ Integer.parseInt(parts[3]),
+ Integer.parseInt(parts[4]),
+ (parts[5].equals("null") ? null : new Date(Long.parseLong(parts[5]))));
+ } catch ( final IllegalArgumentException iae) {
+ // ignore and return null
+ }
}
- this.scheduleType = ScheduleType.valueOf((String)in.readObject());
- this.dayOfWeek = in.readInt();
- this.hourOfDay = in.readInt();
- this.minuteOfHour = in.readInt();
- this.at = (Date) in.readObject();
+ return null;
+ }
+ public String getSerializedString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(VERSION);
+ sb.append(":");
+ sb.append(this.scheduleType.name());
+ sb.append(":");
+ sb.append(String.valueOf(this.dayOfWeek));
+ sb.append(":");
+ sb.append(String.valueOf(this.hourOfDay));
+ sb.append(":");
+ sb.append(String.valueOf(this.minuteOfHour));
+ sb.append(":");
+ if ( at == null ) {
+ sb.append("null");
+ } else {
+ sb.append(String.valueOf(at.getTime()));
+ }
+ return sb.toString();
}
public Date getAt() {
Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java?rev=1530585&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java Wed Oct 9 12:39:10 2013
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.inject.Inject;
+
+import org.apache.sling.event.EventUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class DeprecatedTimedJobsTest extends AbstractJobHandlingTest {
+
+ private static final String TOPIC = "timed/test/topic";
+
+ @Inject
+ private EventAdmin eventAdmin;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ this.sleep(1000L);
+ }
+
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testTimedJob() throws Exception {
+ final AtomicInteger counter = new AtomicInteger();
+
+ final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+
+ @Override
+ public void handleEvent(final Event event) {
+ if ( TOPIC.equals(event.getTopic()) ) {
+ counter.incrementAndGet();
+ }
+ }
+ });
+ try {
+ final Date d = new Date();
+ d.setTime(System.currentTimeMillis() + 2000); // run in 2 seconds
+ // send timed event
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+ props.put(EventUtil.PROPERTY_TIMED_EVENT_DATE, d);
+ this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
+
+ while ( counter.get() == 0 ) {
+ this.sleep(1000);
+ }
+ } finally {
+ ehReg.unregister();
+ }
+ }
+
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testPeriodicTimedJob() throws Exception {
+ final AtomicInteger counter = new AtomicInteger();
+
+ final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+
+ @Override
+ public void handleEvent(final Event event) {
+ if ( TOPIC.equals(event.getTopic()) ) {
+ counter.incrementAndGet();
+ }
+ }
+ });
+ try {
+ // send timed event
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+ props.put(EventUtil.PROPERTY_TIMED_EVENT_PERIOD, 1L);
+ props.put(EventUtil.PROPERTY_TIMED_EVENT_ID, "id");
+ this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
+
+ while ( counter.get() < 5 ) {
+ this.sleep(1000);
+ }
+ final Dictionary<String, Object> props2 = new Hashtable<String, Object>();
+ props2.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+ props2.put(EventUtil.PROPERTY_TIMED_EVENT_ID, "id");
+
+ this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props2));
+ int current = counter.get();
+ this.sleep(2000);
+ if ( counter.get() != current && counter.get() != current + 1 ) {
+ fail("Events are still sent");
+ }
+ } finally {
+ ehReg.unregister();
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java?rev=1530585&r1=1530584&r2=1530585&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java Wed Oct 9 12:39:10 2013
@@ -18,17 +18,15 @@
*/
package org.apache.sling.event.it;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Date;
-import java.util.Dictionary;
-import java.util.Hashtable;
import java.util.concurrent.atomic.AtomicInteger;
-import javax.inject.Inject;
-
-import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -36,9 +34,6 @@ import org.ops4j.pax.exam.junit.PaxExam;
import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
import org.ops4j.pax.exam.spi.reactors.PerMethod;
import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
-import org.osgi.service.event.EventHandler;
@RunWith(PaxExam.class)
@ExamReactorStrategy(PerMethod.class)
@@ -46,9 +41,6 @@ public class TimedJobsTest extends Abstr
private static final String TOPIC = "timed/test/topic";
- @Inject
- private EventAdmin eventAdmin;
-
@Override
@Before
public void setup() throws IOException {
@@ -61,66 +53,29 @@ public class TimedJobsTest extends Abstr
public void testTimedJob() throws Exception {
final AtomicInteger counter = new AtomicInteger();
- final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+ final ServiceRegistration ehReg = this.registerJobConsumer(TOPIC, new JobConsumer() {
@Override
- public void handleEvent(final Event event) {
- if ( TOPIC.equals(event.getTopic()) ) {
+ public JobResult process(final Job job) {
+ if ( job.getTopic().equals(TOPIC) ) {
counter.incrementAndGet();
}
+ return JobResult.OK;
}
+
});
try {
final Date d = new Date();
d.setTime(System.currentTimeMillis() + 2000); // run in 2 seconds
- // send timed event
- final Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
- props.put(EventUtil.PROPERTY_TIMED_EVENT_DATE, d);
- this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
- while ( counter.get() == 0 ) {
- this.sleep(1000);
- }
- } finally {
- ehReg.unregister();
- }
- }
+ // create scheduled job
+ assertTrue(this.getJobManager().createJob(TOPIC).schedule("simpleTest").at(d));
- @Test(timeout = DEFAULT_TEST_TIMEOUT)
- public void testPeriodicTimedJob() throws Exception {
- final AtomicInteger counter = new AtomicInteger();
-
- final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
-
- @Override
- public void handleEvent(final Event event) {
- if ( TOPIC.equals(event.getTopic()) ) {
- counter.incrementAndGet();
- }
- }
- });
- try {
- // send timed event
- final Dictionary<String, Object> props = new Hashtable<String, Object>();
- props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
- props.put(EventUtil.PROPERTY_TIMED_EVENT_PERIOD, 1L);
- props.put(EventUtil.PROPERTY_TIMED_EVENT_ID, "id");
- this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
-
- while ( counter.get() < 5 ) {
+ while ( counter.get() == 0 ) {
this.sleep(1000);
}
- final Dictionary<String, Object> props2 = new Hashtable<String, Object>();
- props2.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
- props2.put(EventUtil.PROPERTY_TIMED_EVENT_ID, "id");
-
- this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props2));
- int current = counter.get();
- this.sleep(2000);
- if ( counter.get() != current && counter.get() != current + 1 ) {
- fail("Events are still sent");
- }
+ this.sleep(1000);
+ assertEquals(0, this.getJobManager().getScheduledJobs().size());
} finally {
ehReg.unregister();
}