You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/09 14:39:11 UTC

svn commit: r1530585 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/support/ test/java/org/apache/sling/event/it/

Author: cziegeler
Date: Wed Oct  9 12:39:10 2013
New Revision: 1530585

URL: http://svn.apache.org/r1530585
Log:
SLING-3139 : Provide a way to schedule jobs

Added:
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java   (with props)
Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java?rev=1530585&r1=1530584&r2=1530585&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobSchedulerImpl.java Wed Oct  9 12:39:10 2013
@@ -196,9 +196,9 @@ public class JobSchedulerImpl
 
                     final String jobTopic = (String) properties.remove(JobUtil.PROPERTY_JOB_TOPIC);
                     final String jobName = (String) properties.remove(JobUtil.PROPERTY_JOB_NAME);
-                    final String schedulerName = (String) properties.remove(ResourceHelper.PROPERTY_SCHEDULER_NAME);
-                    final ScheduleInfo scheduleInfo = (ScheduleInfo) properties.remove(ResourceHelper.PROPERTY_SCHEDULER_INFO);
-                    final boolean isSuspended = properties.remove(ResourceHelper.PROPERTY_SCHEDULER_SUSPENDED) != null;
+                    final String schedulerName = (String) properties.remove(ResourceHelper.PROPERTY_SCHEDULE_NAME);
+                    final ScheduleInfo scheduleInfo = (ScheduleInfo) properties.remove(ResourceHelper.PROPERTY_SCHEDULE_INFO);
+                    final boolean isSuspended = properties.remove(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED) != null;
                     // and now schedule
                     final String key = ResourceHelper.filterName(schedulerName);
                     ScheduledJobInfoImpl info;
@@ -504,7 +504,7 @@ public class JobSchedulerImpl
             final String jobTopic,
             final String jobName,
             final Map<String, Object> jobProperties,
-            final String schedulerName,
+            final String scheduleName,
             final boolean suspend,
             final ScheduleInfo scheduleInfo)
     throws PersistenceException {
@@ -532,16 +532,16 @@ public class JobSchedulerImpl
             properties.put(Job.PROPERTY_JOB_CREATED_INSTANCE, Environment.APPLICATION_ID);
 
             // put scheduler name and scheduler info
-            properties.put(ResourceHelper.PROPERTY_SCHEDULER_NAME, schedulerName);
-            properties.put(ResourceHelper.PROPERTY_SCHEDULER_INFO, scheduleInfo);
+            properties.put(ResourceHelper.PROPERTY_SCHEDULE_NAME, scheduleName);
+            properties.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, scheduleInfo.getSerializedString());
             if ( suspend ) {
-                properties.put(ResourceHelper.PROPERTY_SCHEDULER_SUSPENDED, Boolean.TRUE);
+                properties.put(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED, Boolean.TRUE);
             }
 
             // create path and resource
             properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_SCHEDULED_JOB);
 
-            final String path = this.config.getScheduledJobsPathWithSlash() + ResourceHelper.filterName(schedulerName);
+            final String path = this.config.getScheduledJobsPathWithSlash() + ResourceHelper.filterName(scheduleName);
 
             // update existing resource
             final Resource existingInfo = resolver.getResource(path);
@@ -635,9 +635,9 @@ public class JobSchedulerImpl
             if ( eventResource != null ) {
                 final ModifiableValueMap mvm = eventResource.adaptTo(ModifiableValueMap.class);
                 if ( flag ) {
-                    mvm.put(ResourceHelper.PROPERTY_SCHEDULER_SUSPENDED, Boolean.TRUE);
+                    mvm.put(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED, Boolean.TRUE);
                 } else {
-                    mvm.remove(ResourceHelper.PROPERTY_SCHEDULER_SUSPENDED);
+                    mvm.remove(ResourceHelper.PROPERTY_SCHEDULE_SUSPENDED);
                 }
                 resolver.commit();
             }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1530585&r1=1530584&r2=1530585&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java Wed Oct  9 12:39:10 2013
@@ -56,9 +56,9 @@ public abstract class ResourceHelper {
 
     public static final String BUNDLE_EVENT_STARTED = "org/osgi/framework/BundleEvent/STARTED";
 
-    public static final String PROPERTY_SCHEDULER_NAME = "slingevent:schedulerName";
-    public static final String PROPERTY_SCHEDULER_INFO = "slingevent:schedulerInfo";
-    public static final String PROPERTY_SCHEDULER_SUSPENDED = "slingevent:schedulerSuspended";
+    public static final String PROPERTY_SCHEDULE_NAME = "slingevent:scheduleName";
+    public static final String PROPERTY_SCHEDULE_INFO = "slingevent:scheduleInfo";
+    public static final String PROPERTY_SCHEDULE_SUSPENDED = "slingevent:scheduleSuspended";
 
     /** List of ignored properties to write to the repository. */
     @SuppressWarnings("deprecation")
@@ -81,9 +81,9 @@ public abstract class ResourceHelper {
         Job.PROPERTY_FINISHED_DATE,
         JobImpl.PROPERTY_FINISHED_STATE,
         Job.PROPERTY_RESULT_MESSAGE,
-        PROPERTY_SCHEDULER_INFO,
-        PROPERTY_SCHEDULER_NAME,
-        PROPERTY_SCHEDULER_SUSPENDED
+        PROPERTY_SCHEDULE_INFO,
+        PROPERTY_SCHEDULE_NAME,
+        PROPERTY_SCHEDULE_SUSPENDED
     };
 
     /**
@@ -158,6 +158,14 @@ public abstract class ResourceHelper {
         try {
             final Map<String, Object> result = new HashMap<String, Object>(vm);
             for(final Map.Entry<String, Object> entry : result.entrySet()) {
+                if ( entry.getKey().equals(PROPERTY_SCHEDULE_INFO) ) {
+                    final ScheduleInfo info = ScheduleInfo.deserialize(entry.getValue().toString());
+                    if ( info == null ) {
+                        hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'"));
+                    } else {
+                        entry.setValue(info);
+                    }
+                }
                 if ( entry.getValue() instanceof InputStream ) {
                     final Object value = vm.get(entry.getKey(), Serializable.class);
                     if ( value != null ) {

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java?rev=1530585&r1=1530584&r2=1530585&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ScheduleInfo.java Wed Oct  9 12:39:10 2013
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.event.impl.support;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.Date;
 
@@ -29,7 +28,7 @@ public class ScheduleInfo implements Ser
     private static final long serialVersionUID = 1L;
 
     /** Serialization version. */
-    private static final int VERSION = 1;
+    private static final String VERSION = "1";
 
     public static ScheduleInfo HOURLY(final int minutes) {
         return new ScheduleInfo(ScheduleType.HOURLY, -1, -1, minutes, null);
@@ -47,15 +46,15 @@ public class ScheduleInfo implements Ser
         return new ScheduleInfo(ScheduleType.DAILY, -1, hour, minute, null);
     }
 
-    private ScheduleType scheduleType;
+    private final ScheduleType scheduleType;
 
-    private int dayOfWeek;
+    private final int dayOfWeek;
 
-    private int hourOfDay;
+    private final int hourOfDay;
 
-    private int minuteOfHour;
+    private final int minuteOfHour;
 
-    private Date at;
+    private final Date at;
 
     private ScheduleInfo(final ScheduleType scheduleType,
             final int dayOfWeek,
@@ -69,39 +68,39 @@ public class ScheduleInfo implements Ser
         this.at = at;
     }
 
-    /**
-     * Serialize the object
-     * - write version id
-     * - serialize each entry
-     * @param out Object output stream
-     * @throws IOException
-     */
-    private void writeObject(final java.io.ObjectOutputStream out)
-            throws IOException {
-        out.writeInt(VERSION);
-        out.writeObject(this.scheduleType.name());
-        out.writeInt(this.dayOfWeek);
-        out.writeInt(this.hourOfDay);
-        out.writeInt(this.minuteOfHour);
-        out.writeObject(this.at);
-    }
-
-    /**
-     * Deserialize the object
-     * - read version id
-     * - deserialize each entry
-     */
-    private void readObject(final java.io.ObjectInputStream in)
-    throws IOException, ClassNotFoundException {
-        final int version = in.readInt();
-        if ( version < 1 || version > VERSION ) {
-            throw new ClassNotFoundException(this.getClass().getName());
+    public static ScheduleInfo deserialize(final String s) {
+        final String[] parts = s.split(":");
+        if ( parts.length == 6 && parts[0].equals(VERSION) ) {
+            try {
+                return new ScheduleInfo(ScheduleType.valueOf(parts[1]),
+                        Integer.parseInt(parts[2]),
+                        Integer.parseInt(parts[3]),
+                        Integer.parseInt(parts[4]),
+                        (parts[5].equals("null") ? null : new Date(Long.parseLong(parts[5]))));
+            } catch ( final IllegalArgumentException iae) {
+                // ignore and return null
+            }
         }
-        this.scheduleType = ScheduleType.valueOf((String)in.readObject());
-        this.dayOfWeek = in.readInt();
-        this.hourOfDay = in.readInt();
-        this.minuteOfHour = in.readInt();
-        this.at = (Date) in.readObject();
+        return null;
+    }
+    public String getSerializedString() {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(VERSION);
+        sb.append(":");
+        sb.append(this.scheduleType.name());
+        sb.append(":");
+        sb.append(String.valueOf(this.dayOfWeek));
+        sb.append(":");
+        sb.append(String.valueOf(this.hourOfDay));
+        sb.append(":");
+        sb.append(String.valueOf(this.minuteOfHour));
+        sb.append(":");
+        if ( at == null ) {
+            sb.append("null");
+        } else {
+            sb.append(String.valueOf(at.getTime()));
+        }
+        return sb.toString();
     }
 
     public Date getAt() {

Added: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java?rev=1530585&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java (added)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java Wed Oct  9 12:39:10 2013
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.inject.Inject;
+
+import org.apache.sling.event.EventUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventHandler;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class DeprecatedTimedJobsTest extends AbstractJobHandlingTest {
+
+    private static final String TOPIC = "timed/test/topic";
+
+    @Inject
+    private EventAdmin eventAdmin;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        this.sleep(1000L);
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testTimedJob() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+
+        final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+
+            @Override
+            public void handleEvent(final Event event) {
+                if ( TOPIC.equals(event.getTopic()) ) {
+                    counter.incrementAndGet();
+                }
+            }
+        });
+        try {
+            final Date d = new Date();
+            d.setTime(System.currentTimeMillis() + 2000); // run in 2 seconds
+            // send timed event
+            final Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+            props.put(EventUtil.PROPERTY_TIMED_EVENT_DATE, d);
+            this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
+
+            while ( counter.get() == 0 ) {
+                this.sleep(1000);
+            }
+        } finally {
+            ehReg.unregister();
+        }
+    }
+
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testPeriodicTimedJob() throws Exception {
+        final AtomicInteger counter = new AtomicInteger();
+
+        final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+
+            @Override
+            public void handleEvent(final Event event) {
+                if ( TOPIC.equals(event.getTopic()) ) {
+                    counter.incrementAndGet();
+                }
+            }
+        });
+        try {
+            // send timed event
+            final Dictionary<String, Object> props = new Hashtable<String, Object>();
+            props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+            props.put(EventUtil.PROPERTY_TIMED_EVENT_PERIOD, 1L);
+            props.put(EventUtil.PROPERTY_TIMED_EVENT_ID, "id");
+            this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
+
+            while ( counter.get() < 5 ) {
+                this.sleep(1000);
+            }
+            final Dictionary<String, Object> props2 = new Hashtable<String, Object>();
+            props2.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
+            props2.put(EventUtil.PROPERTY_TIMED_EVENT_ID, "id");
+
+            this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props2));
+            int current = counter.get();
+            this.sleep(2000);
+            if ( counter.get() != current && counter.get() != current + 1 ) {
+                fail("Events are still sent");
+            }
+        } finally {
+            ehReg.unregister();
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DeprecatedTimedJobsTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java?rev=1530585&r1=1530584&r2=1530585&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java Wed Oct  9 12:39:10 2013
@@ -18,17 +18,15 @@
  */
 package org.apache.sling.event.it;
 
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Date;
-import java.util.Dictionary;
-import java.util.Hashtable;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import javax.inject.Inject;
-
-import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,9 +34,6 @@ import org.ops4j.pax.exam.junit.PaxExam;
 import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
 import org.ops4j.pax.exam.spi.reactors.PerMethod;
 import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.event.Event;
-import org.osgi.service.event.EventAdmin;
-import org.osgi.service.event.EventHandler;
 
 @RunWith(PaxExam.class)
 @ExamReactorStrategy(PerMethod.class)
@@ -46,9 +41,6 @@ public class TimedJobsTest extends Abstr
 
     private static final String TOPIC = "timed/test/topic";
 
-    @Inject
-    private EventAdmin eventAdmin;
-
     @Override
     @Before
     public void setup() throws IOException {
@@ -61,66 +53,29 @@ public class TimedJobsTest extends Abstr
     public void testTimedJob() throws Exception {
         final AtomicInteger counter = new AtomicInteger();
 
-        final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
+        final ServiceRegistration ehReg = this.registerJobConsumer(TOPIC, new JobConsumer() {
 
             @Override
-            public void handleEvent(final Event event) {
-                if ( TOPIC.equals(event.getTopic()) ) {
+            public JobResult process(final Job job) {
+                if ( job.getTopic().equals(TOPIC) ) {
                     counter.incrementAndGet();
                 }
+                return JobResult.OK;
             }
+
         });
         try {
             final Date d = new Date();
             d.setTime(System.currentTimeMillis() + 2000); // run in 2 seconds
-            // send timed event
-            final Dictionary<String, Object> props = new Hashtable<String, Object>();
-            props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
-            props.put(EventUtil.PROPERTY_TIMED_EVENT_DATE, d);
-            this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
 
-            while ( counter.get() == 0 ) {
-                this.sleep(1000);
-            }
-        } finally {
-            ehReg.unregister();
-        }
-    }
+            // create scheduled job
+            assertTrue(this.getJobManager().createJob(TOPIC).schedule("simpleTest").at(d));
 
-    @Test(timeout = DEFAULT_TEST_TIMEOUT)
-    public void testPeriodicTimedJob() throws Exception {
-        final AtomicInteger counter = new AtomicInteger();
-
-        final ServiceRegistration ehReg = this.registerEventHandler(TOPIC, new EventHandler() {
-
-            @Override
-            public void handleEvent(final Event event) {
-                if ( TOPIC.equals(event.getTopic()) ) {
-                    counter.incrementAndGet();
-                }
-            }
-        });
-        try {
-            // send timed event
-            final Dictionary<String, Object> props = new Hashtable<String, Object>();
-            props.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
-            props.put(EventUtil.PROPERTY_TIMED_EVENT_PERIOD, 1L);
-            props.put(EventUtil.PROPERTY_TIMED_EVENT_ID, "id");
-            this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props));
-
-            while ( counter.get() < 5 ) {
+            while ( counter.get() == 0 ) {
                 this.sleep(1000);
             }
-            final Dictionary<String, Object> props2 = new Hashtable<String, Object>();
-            props2.put(EventUtil.PROPERTY_TIMED_EVENT_TOPIC, TOPIC);
-            props2.put(EventUtil.PROPERTY_TIMED_EVENT_ID, "id");
-
-            this.eventAdmin.sendEvent(new Event(EventUtil.TOPIC_TIMED_EVENT, props2));
-            int current = counter.get();
-            this.sleep(2000);
-            if ( counter.get() != current && counter.get() != current + 1 ) {
-                fail("Events are still sent");
-            }
+            this.sleep(1000);
+            assertEquals(0, this.getJobManager().getScheduledJobs().size());
         } finally {
             ehReg.unregister();
         }