You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/22 13:42:55 UTC

svn commit: r1470462 [1/7] - in /sling/trunk/bundles/extensions/event: ./ src/main/java/org/apache/sling/event/ src/main/java/org/apache/sling/event/impl/ src/main/java/org/apache/sling/event/impl/dea/ src/main/java/org/apache/sling/event/impl/jobs/ sr...

Author: cziegeler
Date: Mon Apr 22 11:42:53 2013
New Revision: 1470462

URL: http://svn.apache.org/r1470462
Log:
SLING-2832 : Use resource API 
SLING-2831 : Use new discovery API
SLING-2830 : Discontinue per job configurations for queue processing 
SLING-2829 : Add API for starting a job and service interface for executing a job 

Added:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobsIteratorImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jmx/QueueStatusEvent.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventConfiguration.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventReceiver.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ExactTopicMatcher.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/PackageTopicMatcher.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/SubPackagesTopicMatcher.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/TopicMatcher.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/TopicMatcherHelper.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java   (with props)
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobConsumer.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/dea/
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/dea/DistributingEventHandlerTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java   (with props)
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java   (with props)
Removed:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/QueueStatusEvent.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/ExtendedJobManager.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java
Modified:
    sling/trunk/bundles/extensions/event/pom.xml
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jmx/AllJobStatisticsMBean.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jmx/QueuesMBeanImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/StatisticsImplTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/UtilityTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/jmx/QueuesMBeanImplTest.java

Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Mon Apr 22 11:42:53 2013
@@ -45,8 +45,13 @@
     <properties>
         <site.jira.version.id>12315369</site.jira.version.id>
         <sling.java.version>6</sling.java.version>
+        <exam.version>2.5.0</exam.version>
+        <url.version>1.4.0</url.version>
+         <bundle.build.name>${basedir}/target</bundle.build.name>
+        <bundle.file.name>${bundle.build.name}/${project.build.finalName}.jar</bundle.file.name>
     </properties>
-
+ 
+ 
     <build>
         <plugins>
             <plugin>
@@ -68,7 +73,7 @@
                         </Import-Package>
                         <Export-Package>
                             org.apache.sling.event;version=2.4.0,
-                            org.apache.sling.event.jobs;version=1.1.0,
+                            org.apache.sling.event.jobs;version=1.2.0,
                             org.apache.sling.event.jobs.jmx;version=1.0.0
                         </Export-Package>
                         <Private-Package>
@@ -87,6 +92,60 @@
                     </instructions>
                 </configuration>
             </plugin>
+            <!-- plain unit tests -->
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>**/it/**</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <!-- integration tests run with pax-exam -->
+            <plugin>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>2.12</version>
+                <executions>
+                  <execution>
+                    <goals>
+                      <goal>integration-test</goal>
+                      <goal>verify</goal>
+                    </goals>
+                  </execution>
+                </executions>
+                <configuration>
+                    <systemProperties>
+                        <property>
+                            <name>project.bundle.file</name>
+                            <value>${bundle.file.name}</value>
+                        </property>
+                    </systemProperties>
+                    <argLine>
+                        -Xmx1024m -XX:MaxPermSize=368m
+                    </argLine>
+                    <includes>
+                        <include>**/it/*</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-clean-plugin</artifactId>
+                <version>2.5</version>
+                <configuration>
+                    <filesets>
+                        <fileset>
+                            <directory>${basedir}</directory>
+                            <includes>
+                                <include>derby.log</include>
+                            </includes>
+                        </fileset>
+                        <fileset>
+                            <directory>jackrabbit</directory>
+                        </fileset>
+                    </filesets>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
     <reporting>
@@ -103,35 +162,29 @@
         </plugins>
     </reporting>
     <dependencies>
-      <!--  We have to put this test dependency first, to make sure this version is
-            used for testing! -->
-        <dependency>
-            <groupId>commons-io</groupId>
-            <artifactId>commons-io</artifactId>
-            <version>1.4</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.scr.annotations</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.discovery.api</artifactId>
+            <version>0.1.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.core</artifactId>
+            <version>4.2.0</version>
         </dependency>
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.compendium</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>javax.jcr</groupId>
-            <artifactId>jcr</artifactId>
-            <version>2.0</version>
-            <scope>provided</scope>
+            <version>4.2.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
@@ -141,8 +194,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
-            <artifactId>org.apache.sling.jcr.api</artifactId>
-            <version>2.0.6</version>
+            <artifactId>org.apache.sling.api</artifactId>
+            <version>2.4.0</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
@@ -182,12 +235,6 @@
         </dependency>
       <!-- Testing -->
         <dependency>
-            <groupId>org.apache.sling</groupId>
-            <artifactId>org.apache.sling.jcr.resource</artifactId>
-            <version>2.0.6</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
         </dependency>
@@ -211,11 +258,68 @@
             <version>1.9.5</version>
             <scope>test</scope>
         </dependency>
+
+
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-container-native</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+ 
         <dependency>
-            <groupId>org.apache.jackrabbit</groupId>
-            <artifactId>jackrabbit-core</artifactId>
-            <version>2.2.4</version>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-junit4</artifactId>
+            <version>${exam.version}</version>
             <scope>test</scope>
         </dependency>
+ 
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-link-mvn</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+ 
+        <dependency>
+            <groupId>org.ops4j.pax.url</groupId>
+            <artifactId>pax-url-aether</artifactId>
+            <version>${url.version}</version>
+            <scope>test</scope>
+        </dependency>
+ 
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.framework</artifactId>
+            <version>3.2.2</version>
+            <scope>test</scope>
+        </dependency>
+ 
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>0.9.20</version>
+            <scope>test</scope>
+        </dependency>
+ 
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>0.9.20</version>
+            <scope>test</scope>
+        </dependency>
+         <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.launchpad.api</artifactId>
+            <version>1.1.0</version>
+            <scope>test</scope>
+        </dependency>
+         <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.testing.resourceresolver-mock</artifactId>
+            <version>0.1.0</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 </project>

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java Mon Apr 22 11:42:53 2013
@@ -78,6 +78,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Map#clear()
      */
+    @Override
     public void clear() {
         delegatee.clear();
     }
@@ -85,6 +86,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Map#containsKey(java.lang.Object)
      */
+    @Override
     public boolean containsKey(Object key) {
         return delegatee.containsKey(key);
     }
@@ -92,6 +94,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Map#containsValue(java.lang.Object)
      */
+    @Override
     public boolean containsValue(Object value) {
         return delegatee.containsValue(value);
     }
@@ -99,6 +102,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Map#entrySet()
      */
+    @Override
     public Set<java.util.Map.Entry<String, Object>> entrySet() {
         return delegatee.entrySet();
     }
@@ -106,13 +110,18 @@ public class EventPropertiesMap
     /**
      * @see java.lang.Object#equals(java.lang.Object)
      */
+    @Override
     public boolean equals(Object o) {
+        if ( o instanceof EventPropertiesMap ) {
+            return delegatee.equals(((EventPropertiesMap)o).delegatee);
+        }
         return delegatee.equals(o);
     }
 
     /**
      * @see java.util.Dictionary#get(java.lang.Object)
      */
+    @Override
     public Object get(Object key) {
         return delegatee.get(key);
     }
@@ -120,6 +129,7 @@ public class EventPropertiesMap
     /**
      * @see java.lang.Object#hashCode()
      */
+    @Override
     public int hashCode() {
         return delegatee.hashCode();
     }
@@ -127,6 +137,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Dictionary#isEmpty()
      */
+    @Override
     public boolean isEmpty() {
         return delegatee.isEmpty();
     }
@@ -134,6 +145,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Map#keySet()
      */
+    @Override
     public Set<String> keySet() {
         return delegatee.keySet();
     }
@@ -141,6 +153,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Dictionary#put(java.lang.Object, java.lang.Object)
      */
+    @Override
     public Object put(String key, Object value) {
         return delegatee.put(key, value);
     }
@@ -148,6 +161,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Map#putAll(java.util.Map)
      */
+    @Override
     public void putAll(Map<? extends String, ? extends Object> t) {
         delegatee.putAll(t);
     }
@@ -155,6 +169,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Dictionary#remove(java.lang.Object)
      */
+    @Override
     public Object remove(Object key) {
         return delegatee.remove(key);
     }
@@ -162,6 +177,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Dictionary#size()
      */
+    @Override
     public int size() {
         return delegatee.size();
     }
@@ -169,6 +185,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Map#values()
      */
+    @Override
     public Collection<Object> values() {
         return delegatee.values();
     }
@@ -176,6 +193,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Dictionary#elements()
      */
+    @Override
     public Enumeration<Object> elements() {
         return Collections.enumeration(this.values());
     }
@@ -183,6 +201,7 @@ public class EventPropertiesMap
     /**
      * @see java.util.Dictionary#keys()
      */
+    @Override
     public Enumeration<String> keys() {
         return Collections.enumeration(this.keySet());
     }
@@ -190,6 +209,7 @@ public class EventPropertiesMap
     /**
      * @see java.lang.Object#toString()
      */
+    @Override
     public String toString() {
         return this.delegatee.toString();
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Mon Apr 22 11:42:53 2013
@@ -165,13 +165,13 @@ public abstract class EventUtil {
     /** The property for the unique event id. */
     public static final String PROPERTY_TIMED_EVENT_ID = "event.timed.id";
 
-    /** The scheduler expression for the timed event. */
+    /** The scheduler cron expression for the timed event. Type must be String. */
     public static final String PROPERTY_TIMED_EVENT_SCHEDULE = "event.timed.scheduler";
 
-    /** The period for the timed event. */
+    /** The period in seconds for the timed event. Type must be Long*/
     public static final String PROPERTY_TIMED_EVENT_PERIOD = "event.timed.period";
 
-    /** The date for the timed event. */
+    /** The date for the timed event. Type must be Date.  */
     public static final String PROPERTY_TIMED_EVENT_DATE = "event.timed.date";
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java Mon Apr 22 11:42:53 2013
@@ -29,12 +29,12 @@ import org.osgi.service.event.Event;
 public interface TimedEventStatusProvider {
 
     /**
-     * This is a unique identifer which can be used to cancel the job.
+     * This is a unique identifier which can be used to cancel the job.
      */
     String PROPERTY_EVENT_ID = "slingevent:eventId";
 
     /**
-     * Return a list of currently schedulded events.
+     * Return a list of currently scheduled events.
      * @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
      * @param filterProps A list of filter property maps. Each map acts like a template. The searched event
      *                    must match the template (AND query). By providing several maps, different filters
@@ -51,7 +51,7 @@ public interface TimedEventStatusProvide
 
     /**
      * Cancel this timed event.
-     * @param jobId The unique identifer as found in the property {@link #PROPERTY_EVENT_ID}.
+     * @param jobId The unique identifier as found in the property {@link #PROPERTY_EVENT_ID}.
      */
     void cancelTimedEvent(String jobId);
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java Mon Apr 22 11:42:53 2013
@@ -18,9 +18,6 @@
  */
 package org.apache.sling.event.impl;
 
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -31,25 +28,20 @@ import org.apache.felix.scr.annotations.
 import org.apache.sling.commons.classloader.DynamicClassLoaderManager;
 import org.apache.sling.commons.threads.ThreadPool;
 import org.apache.sling.event.impl.support.Environment;
-import org.apache.sling.jcr.api.SlingRepository;
 import org.apache.sling.settings.SlingSettingsService;
-import org.osgi.service.event.EventAdmin;
 
 /**
  * Environment component. This component provides "global settings"
  * to all services, like the application id and the thread pool.
  * @since 3.0
+ *
+ * This component needs to be immediate to set the global variables
+ * (application id and thread pool).
  */
-@Component()
+@Component(immediate=true)
 @Service(value=EnvironmentComponent.class)
 public class EnvironmentComponent {
 
-    @Reference
-    private SlingRepository repository;
-
-    @Reference
-    private EventAdmin eventAdmin;
-
     @Reference(policy=ReferencePolicy.DYNAMIC,cardinality=ReferenceCardinality.OPTIONAL_UNARY)
     private DynamicClassLoaderManager classLoaderManager;
 
@@ -74,7 +66,7 @@ public class EnvironmentComponent {
     }
 
     /**
-     * Dectivate this component.
+     * Deactivate this component.
      */
     @Deactivate
     protected void deactivate() {
@@ -84,41 +76,6 @@ public class EnvironmentComponent {
         }
     }
 
-    /**
-     * Return the dynamic classloader for loading events from the repository.
-     */
-    public ClassLoader getDynamicClassLoader() {
-        final DynamicClassLoaderManager dclm = this.classLoaderManager;
-        if ( dclm != null ) {
-            return dclm.getDynamicClassLoader();
-        }
-        // if we don't have a dynamic classloader, we return our classloader
-        return this.getClass().getClassLoader();
-    }
-
-    /**
-     * Create a new admin session.
-     * @return A new admin session.
-     * @throws RepositoryException
-     */
-    public Session createAdminSession()
-    throws RepositoryException {
-        final SlingRepository repo = this.repository;
-        if ( repo == null ) {
-            // as the repo is a hard dependency for this service, the repo
-            // is always available, but we check for null anyway!
-            throw new RepositoryException("Repository is currently not available.");
-        }
-        return repo.loginAdministrative(null);
-    }
-
-    /**
-     * Return the event admin.
-     */
-    public EventAdmin getEventAdmin() {
-        return this.eventAdmin;
-    }
-
     protected void bindThreadPool(final EventingThreadPool etp) {
         this.threadPool = etp;
     }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Mon Apr 22 11:42:53 2013
@@ -18,7 +18,11 @@
  */
 package org.apache.sling.event.impl;
 
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.PropertyOption;
 import org.apache.felix.scr.annotations.Reference;
@@ -29,7 +33,6 @@ import org.apache.sling.commons.threads.
 import org.apache.sling.commons.threads.ThreadPoolConfig;
 import org.apache.sling.commons.threads.ThreadPoolConfig.ThreadPriority;
 import org.apache.sling.commons.threads.ThreadPoolManager;
-import org.osgi.service.component.ComponentContext;
 
 
 /**
@@ -60,37 +63,39 @@ public class EventingThreadPool implemen
 
     /**
      * Activate this component.
-     * @param context
      */
-    protected void activate(final ComponentContext ctx) {
+    @Activate
+    protected void activate(final Map<String, Object> props) {
         final ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig();
-        config.setMinPoolSize(PropertiesUtil.toInteger(ctx.getProperties().get(PROPERTY_POOL_SIZE), DEFAULT_POOL_SIZE));
+        config.setMinPoolSize(PropertiesUtil.toInteger(props.get(PROPERTY_POOL_SIZE), DEFAULT_POOL_SIZE));
         config.setMaxPoolSize(config.getMinPoolSize());
         config.setQueueSize(-1); // unlimited
         config.setShutdownGraceful(true);
-        config.setPriority(ThreadPriority.valueOf(PropertiesUtil.toString(ctx.getProperties().get(PROPERTY_PRIORITY), "NORM")));
+        config.setPriority(ThreadPriority.valueOf(PropertiesUtil.toString(props.get(PROPERTY_PRIORITY), "NORM")));
         config.setDaemon(true);
         this.threadPool = threadPoolManager.create(config, "Apache Sling Eventing Thread Pool");
     }
 
     /**
      * Deactivate this component.
-     * @param context
      */
-    protected void deactivate(final ComponentContext context) {
+    @Deactivate
+    protected void deactivate() {
         this.threadPoolManager.release(this.threadPool);
     }
 
     /**
      * @see org.apache.sling.commons.threads.ThreadPool#execute(java.lang.Runnable)
      */
-    public void execute(Runnable runnable) {
+    @Override
+    public void execute(final Runnable runnable) {
         threadPool.execute(runnable);
     }
 
     /**
      * @see org.apache.sling.commons.threads.ThreadPool#getConfiguration()
      */
+    @Override
     public ThreadPoolConfig getConfiguration() {
         return threadPool.getConfiguration();
     }
@@ -98,6 +103,7 @@ public class EventingThreadPool implemen
     /**
      * @see org.apache.sling.commons.threads.ThreadPool#getName()
      */
+    @Override
     public String getName() {
         return threadPool.getName();
     }

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.dea;
+
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.support.Environment;
+
+/**
+ * This service wraps the configuration of the distributed event admin.
+ */
+@Component(name="org.apache.sling.event.impl.DistributingEventHandler")
+@Service(value=DistributedEventAdminConfiguration.class)
+public class DistributedEventAdminConfiguration {
+
+    /** Default repository path. */
+    private static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/distribution";
+
+    /** The path where all jobs are stored. */
+    @Property(value=DEFAULT_REPOSITORY_PATH, propertyPrivate=true)
+    private static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
+
+    /** Default clean up time is 15 minutes. */
+    private static final int DEFAULT_CLEANUP_PERIOD = 15;
+
+    @Property(intValue=DEFAULT_CLEANUP_PERIOD, propertyPrivate=true)
+    private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+    /** We remove everything which is older than 15min by default. */
+    private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+
+    /** The path in the resource tree. */
+    private String rootPath;
+
+    private String rootPathWithSlash;
+
+    private String ownRootPath;
+
+    private String ownRootPathSlash;
+
+    @Reference
+    private EnvironmentComponent environment;
+
+    @Activate
+    protected void activate(final Map<String, Object> props) {
+        this.cleanupPeriod = PropertiesUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+        this.rootPath = PropertiesUtil.toString(props.get(
+                CONFIG_PROPERTY_REPOSITORY_PATH), DEFAULT_REPOSITORY_PATH);
+        this.rootPathWithSlash = this.rootPath.concat("/");
+        this.ownRootPath = this.rootPathWithSlash.concat(Environment.APPLICATION_ID);
+        this.ownRootPathSlash = this.ownRootPath.concat("/");
+    }
+
+    /**
+     * This is the root path for all events.
+     * @return The path ending with a slash.
+     */
+    public String getRootPathWithSlash() {
+        return this.rootPathWithSlash;
+    }
+
+    /**
+     * This is the root path for all events.
+     * @return The path ending with a slash.
+     */
+    public String getRootPath() {
+        return this.rootPath;
+    }
+
+    /**
+     * This is the root path for all events.
+     * @return The path does not end with a slash.
+     */
+    public String getOwnRootPath() {
+        return this.ownRootPath;
+    }
+
+    /**
+     * This is the root path for all events of this instance.
+     * @return The path ending with a slash.
+     */
+    public String getOwnRootPathWithSlash() {
+        return this.ownRootPathSlash;
+    }
+
+    public int getCleanupPeriod() {
+        return this.cleanupPeriod;
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.dea;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import aQute.bnd.annotation.component.Deactivate;
+
+/**
+ * This is the distributed event receiver.
+ * It listens for all distributable events and stores them in the
+ * repository for other cluster instances to pick them up.
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ */
+@Component(immediate=true)
+@Service(value={EventHandler.class, Runnable.class, TopologyEventListener.class})
+@Properties({
+    @Property(name=EventConstants.EVENT_TOPIC, value="*", propertyPrivate=true),
+    @Property(name=EventConstants.EVENT_FILTER, value="(event.distribute=*)", propertyPrivate=true),
+    @Property(name="scheduler.period", longValue=1800, propertyPrivate=true),
+    @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
+})
+public class DistributedEventReceiver
+    implements EventHandler, Runnable, TopologyEventListener {
+
+    /** Default logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    @Reference
+    private DistributedEventAdminConfiguration config;
+
+    /** Is the background task still running? */
+    private volatile boolean running;
+
+    /** A local queue for writing received events into the repository. */
+    private final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
+
+    @Reference
+    private ResourceResolverFactory resourceResolverFactory;
+
+    /** Resolver used for writing. */
+    private ResourceResolver writerResolver;
+
+    /** The current instances if we're the leader. */
+    private Set<String> instances;
+
+    @Activate
+    protected void activate() {
+        this.running = true;
+        // start writer thread
+        final Thread writerThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    writerResolver = resourceResolverFactory.getAdministrativeResourceResolver(null);
+                    ResourceHelper.getOrCreateBasePath(writerResolver, config.getOwnRootPath());
+                } catch (final Exception e) {
+                    // there is nothing we can do except log!
+                    logger.error("Error during resource resolver creation.", e);
+                    running = false;
+                }
+                try {
+                    processWriteQueue();
+                } catch (final Throwable t) { //NOSONAR
+                    logger.error("Writer thread stopped with exception: " + t.getMessage(), t);
+                    running = false;
+                }
+                if ( writerResolver != null ) {
+                    writerResolver.close();
+                    writerResolver = null;
+                }
+            }
+        });
+        writerThread.start();
+    }
+
+    /**
+     * Deactivate this component.
+     */
+    @Deactivate
+    protected void deactivate() {
+        // stop background threads by putting empty objects into the queue
+        this.running = false;
+        try {
+            this.writeQueue.put(new Event("some", (Dictionary<String, Object>)null));
+        } catch (final InterruptedException e) {
+            this.ignoreException(e);
+        }
+    }
+
+    /**
+     * Background thread writing events into the queue.
+     */
+    private void processWriteQueue() {
+        while ( this.running ) {
+            // so let's wait/get the next event from the queue
+            Event event = null;
+            try {
+                event = this.writeQueue.take();
+            } catch (final InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( event != null && this.running ) {
+                try {
+                    this.writeEvent(event);
+                } catch (final Exception e) {
+                    this.logger.error("Exception during writing the event to the resource tree.", e);
+                }
+            }
+        }
+    }
+
+    /** Counter for jobs without an id. */
+    private final AtomicLong eventCounter = new AtomicLong(0);
+
+    /**
+     * Write an event to the resource tree.
+     * @param event The event
+     * @throws PersistenceException
+     */
+    private void writeEvent(final Event event)
+    throws PersistenceException {
+        final Calendar now = Calendar.getInstance();
+
+        final StringBuilder sb = new StringBuilder(this.config.getOwnRootPathWithSlash());
+        sb.append(now.get(Calendar.YEAR));
+        sb.append('/');
+        sb.append(now.get(Calendar.MONTH) + 1);
+        sb.append('/');
+        sb.append(now.get(Calendar.DAY_OF_MONTH));
+        sb.append('/');
+        sb.append(now.get(Calendar.HOUR_OF_DAY));
+        sb.append('/');
+        sb.append(now.get(Calendar.MINUTE));
+        sb.append('/');
+        sb.append("event-");
+        sb.append(String.valueOf(eventCounter.getAndIncrement()));
+
+        // create properties
+        final Map<String, Object> properties = new HashMap<String, Object>();
+
+        final String[] propNames = event.getPropertyNames();
+        if ( propNames != null && propNames.length > 0 ) {
+            for(final String propName : propNames) {
+                if ( !ResourceHelper.ignoreProperty(propName) || JobUtil.JOB_ID.equals(propName) ) { // special handling for job id
+                    properties.put(propName, event.getProperty(propName));
+                }
+            }
+        }
+
+        properties.put(EventConstants.EVENT_TOPIC, event.getTopic());
+        properties.put(EventUtil.PROPERTY_APPLICATION, Environment.APPLICATION_ID);
+        properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_EVENT);
+
+        ResourceHelper.getOrCreateResource(this.writerResolver,
+                sb.toString(),
+                properties);
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    @Override
+    public void handleEvent(final Event event) {
+        try {
+            this.writeQueue.put(event);
+        } catch (final InterruptedException ex) {
+            // we ignore this
+            this.ignoreException(ex);
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * This method is invoked periodically.
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run() {
+        this.cleanUpObsoleteInstances();
+        this.cleanUpObsoleteEvents();
+    }
+
+    private void cleanUpObsoleteInstances() {
+        final Set<String> slingIds = this.instances;
+        if ( slingIds != null ) {
+            this.instances = null;
+            this.logger.debug("Checking for old instance trees for distributed events.");
+            ResourceResolver resolver = null;
+            try {
+                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+                final Resource baseResource = resolver.getResource(this.config.getRootPathWithSlash());
+                // sanity check - should never be null
+                if ( baseResource != null ) {
+                    final Iterator<Resource> iter = baseResource.listChildren();
+                    while ( iter.hasNext() ) {
+                        final Resource rootResource = iter.next();
+                        if ( !slingIds.contains(rootResource.getName()) ) {
+                            resolver.delete(rootResource);
+                        }
+                    }
+                    resolver.commit();
+                }
+
+            } catch (final PersistenceException pe) {
+                // in the case of an error, we just log this as a warning
+                this.logger.warn("Exception during job resource tree cleanup.", pe);
+            } catch (final LoginException ignore) {
+                this.ignoreException(ignore);
+            } finally {
+                if ( resolver != null ) {
+                    resolver.close();
+                }
+            }
+        }
+    }
+
+    private void cleanUpObsoleteEvents() {
+        if ( this.config.getCleanupPeriod() > 0 ) {
+            this.logger.debug("Cleaning up distributed events, removing all entries older than {} minutes.", this.config.getCleanupPeriod());
+
+            ResourceResolver resolver = null;
+            try {
+                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+                final Resource baseResource = resolver.getResource(this.config.getOwnRootPath());
+                // sanity check - should never be null
+                if ( baseResource != null ) {
+                    final Calendar oldDate = Calendar.getInstance();
+                    oldDate.add(Calendar.MINUTE, -1 * this.config.getCleanupPeriod());
+
+                    // check years
+                    final int oldYear = oldDate.get(Calendar.YEAR);
+                    final Iterator<Resource> yearIter = baseResource.listChildren();
+                    while ( yearIter.hasNext() ) {
+                        final Resource yearResource = yearIter.next();
+                        final int year = Integer.valueOf(yearResource.getName());
+                        if ( year < oldYear ) {
+                            resolver.delete(yearResource);
+                        } else if ( year == oldYear ) {
+
+                            // same year - check months
+                            final int oldMonth = oldDate.get(Calendar.MONTH) + 1;
+                            final Iterator<Resource> monthIter = yearResource.listChildren();
+                            while ( monthIter.hasNext() ) {
+                                final Resource monthResource = monthIter.next();
+                                final int month = Integer.valueOf(monthResource.getName());
+                                if ( month < oldMonth ) {
+                                    resolver.delete(monthResource);
+                                } else if ( month == oldMonth ) {
+
+                                    // same month - check days
+                                    final int oldDay = oldDate.get(Calendar.DAY_OF_MONTH);
+                                    final Iterator<Resource> dayIter = monthResource.listChildren();
+                                    while ( dayIter.hasNext() ) {
+                                        final Resource dayResource = dayIter.next();
+                                        final int day = Integer.valueOf(dayResource.getName());
+                                        if ( day < oldDay ) {
+                                            resolver.delete(dayResource);
+                                        } else if ( day == oldDay ) {
+
+                                            // same day - check hours
+                                            final int oldHour = oldDate.get(Calendar.HOUR_OF_DAY);
+                                            final Iterator<Resource> hourIter = dayResource.listChildren();
+                                            while ( hourIter.hasNext() ) {
+                                                final Resource hourResource = hourIter.next();
+                                                final int hour = Integer.valueOf(hourResource.getName());
+                                                if ( hour < oldHour ) {
+                                                    resolver.delete(hourResource);
+                                                } else if ( hour == oldHour ) {
+
+                                                    // same hour - check minutes
+                                                    final int oldMinute = oldDate.get(Calendar.MINUTE);
+                                                    final Iterator<Resource> minuteIter = hourResource.listChildren();
+                                                    while ( minuteIter.hasNext() ) {
+                                                        final Resource minuteResource = minuteIter.next();
+
+                                                        final int minute = Integer.valueOf(minuteResource.getName());
+                                                        if ( minute < oldMinute ) {
+                                                            resolver.delete(minuteResource);
+                                                        }
+                                                    }
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+                resolver.commit();
+
+            } catch (final PersistenceException pe) {
+                // in the case of an error, we just log this as a warning
+                this.logger.warn("Exception during job resource tree cleanup.", pe);
+            } catch (final LoginException ignore) {
+                this.ignoreException(ignore);
+            } finally {
+                if ( resolver != null ) {
+                    resolver.close();
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
+     */
+    @Override
+    public void handleTopologyEvent(final TopologyEvent event) {
+        if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+            this.instances = null;
+        } else if ( event.getType() == Type.TOPOLOGY_CHANGED || event.getType() == Type.TOPOLOGY_INIT ) {
+            if ( event.getNewView().getLocalInstance().isLeader() ) {
+                final Set<String> set = new HashSet<String>();
+                for(final InstanceDescription desc : event.getNewView().getInstances() ) {
+                    set.add(desc.getSlingId());
+                }
+                this.instances = set;
+            }
+        }
+    }
+}
+

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.dea;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import aQute.bnd.annotation.component.Deactivate;
+
+/**
+ * This event handler distributes events across an application cluster.
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ */
+@Component(immediate=true)
+@Service(value={EventHandler.class})
+@Properties({
+    @Property(name=EventConstants.EVENT_TOPIC, value=SlingConstants.TOPIC_RESOURCE_ADDED, propertyPrivate=true),
+})
+public class DistributedEventSender
+    implements EventHandler {
+
+    /** Default logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** Is the background task still running? */
+    private volatile boolean running;
+
+    /** A local queue for serializing the event processing. */
+    private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+
+    @Reference
+    private ResourceResolverFactory resourceResolverFactory;
+
+    @Reference
+    private DistributedEventAdminConfiguration config;
+
+    @Reference
+    private EventAdmin eventAdmin;
+
+    @Activate
+    protected void activate() {
+        this.running = true;
+        final Thread backgroundThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    runInBackground();
+                } catch (Throwable t) { //NOSONAR
+                    logger.error("Background thread stopped with exception: " + t.getMessage(), t);
+                    running = false;
+                }
+            }
+        });
+        backgroundThread.start();
+    }
+
+    /**
+     * Deactivate this component.
+     */
+    @Deactivate
+    protected void deactivate() {
+        // stop background threads by putting empty objects into the queue
+        this.running = false;
+        try {
+            this.queue.put("");
+        } catch (final InterruptedException e) {
+            this.ignoreException(e);
+        }
+    }
+
+    /**
+     * Read an event from the resource
+     * @return The event object or <code>null</code>
+     */
+    private Event readEvent(final Resource eventResource) {
+        final ValueMap vm = ResourceUtil.getValueMap(eventResource);
+        final String topic = vm.get(EventConstants.EVENT_TOPIC, String.class);
+        final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+        // only send event if there are no read errors, otherwise discard it
+        if ( properties.get(ResourceHelper.PROPERTY_MARKER_READ_ERROR) == null ) {
+            properties.remove(EventConstants.EVENT_TOPIC);
+
+            try {
+                // special handling for job notification jobs for compatibility
+                if ( topic.startsWith("org/apache/sling/event/notification/job/") ) {
+                    final String jobTopic = (String)properties.get(JobUtil.NOTIFICATION_PROPERTY_JOB_TOPIC);
+                    if ( jobTopic != null) {
+                        final Event jobEvent = new Event(jobTopic, properties);
+                        properties.put(JobUtil.PROPERTY_NOTIFICATION_JOB, jobEvent);
+                    }
+                }
+                final Event event = new Event(topic, properties);
+                return event;
+            } catch (final IllegalArgumentException iae) {
+                // this exception occurs if the topic is not correct (it should never happen,
+                // but you never know)
+                logger.error("Unable to read event: " + iae.getMessage(), iae);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Background thread
+     */
+    private void runInBackground() {
+        while ( this.running ) {
+            // so let's wait/get the next event from the queue
+            String path = null;
+            try {
+                path = this.queue.take();
+            } catch (final InterruptedException e) {
+                // we ignore this
+                this.ignoreException(e);
+            }
+            if ( path != null && path.length() > 0 && this.running ) {
+                ResourceResolver resolver = null;
+                try {
+                    resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                    final Resource eventResource = resolver.getResource(path);
+                    if ( eventResource.isResourceType(ResourceHelper.RESOURCE_TYPE_EVENT) ) {
+                        final Event e = this.readEvent(eventResource);
+                        if ( e != null ) {
+                            // we check event admin as processing is async
+                            final EventAdmin localEA = this.eventAdmin;
+                            if ( localEA != null ) {
+                                localEA.postEvent(e);
+                            } else {
+                                this.logger.error("Unable to post event as no event admin is available.");
+                            }
+                        }
+                    }
+                } catch (final LoginException ex) {
+                    this.logger.error("Exception during creation of resource resolver.", ex);
+                } finally {
+                    if ( resolver != null ) {
+                        resolver.close();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    @Override
+    public void handleEvent(final Event event) {
+        final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
+        if ( path != null
+             && path.startsWith(this.config.getRootPathWithSlash())
+             && !path.startsWith(this.config.getOwnRootPathWithSlash()) ) {
+
+            try {
+                this.queue.put(path);
+            } catch (final InterruptedException ex) {
+                // we ignore this
+                this.ignoreException(ex);
+            }
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.Calendar;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.QuerySyntaxException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Task for loading stored jobs from the repository.
+ *
+ * TODO - we need better synching - especially something for loadJob(String)
+ */
+public class BackgroundLoader implements Runnable {
+
+    /** Default logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** The job manager configuration. */
+    private final JobManagerConfiguration configuration;
+
+    /** Resource resolver factory. */
+    private final ResourceResolverFactory resourceResolverFactory;
+
+    /** Is this still active? */
+    private final AtomicBoolean active = new AtomicBoolean(false);
+
+    private volatile boolean running = false;
+
+    /** Job Manager implementation. */
+    private final JobManagerImpl jobManager;
+
+    /** Lock object for loading */
+    private final Object loadLock = new Object();
+
+    /** Lock object for stopping */
+    private final Object stopLock = new Object();
+
+    /** Unloaded jobs. */
+    private final Set<String> unloadedJobs = new HashSet<String>();
+
+    /** A local queue for handling new jobs. */
+    private final BlockingQueue<String> actionQueue = new LinkedBlockingQueue<String>();
+
+    private long changeCount = 0;
+
+    /**
+     * Create and activate the loader.
+     */
+    public BackgroundLoader(final JobManagerImpl jobManagerImpl,
+            final JobManagerConfiguration configuration2,
+            final ResourceResolverFactory resourceResolverFactory2) {
+        this.resourceResolverFactory = resourceResolverFactory2;
+        this.configuration = configuration2;
+        this.jobManager = jobManagerImpl;
+        this.active.set(true);
+        // start background thread
+        final Thread loaderThread = new Thread(this, "Apache Sling Job Background Loader");
+        loaderThread.setDaemon(true);
+        loaderThread.start();
+    }
+
+    /**
+     * Deactivate the loader.
+     */
+    public void deactivate() {
+        this.active.set(false);
+        // make sure to stop background thread
+        synchronized ( this.loadLock ) {
+            this.running = false;
+            this.loadLock.notify();
+        }
+        this.stop();
+        synchronized ( this.stopLock ) {
+            this.stopLock.notify();
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Start the background loader process.
+     */
+    public void start() {
+        synchronized ( this.loadLock ) {
+            this.changeCount++;
+            this.running = true;
+            this.loadLock.notify();
+            // make sure to clear out old information
+            this.actionQueue.clear();
+            this.unloadedJobs.clear();
+        }
+    }
+
+    private static final String END_TOKEN = "*";
+
+    /**
+     * Stop the background loader process.
+     */
+    public void stop() {
+        synchronized ( this.loadLock ) {
+            this.running = false;
+        }
+        // stop action queue
+        try {
+            this.actionQueue.put(END_TOKEN);
+        } catch (final InterruptedException e) {
+            this.ignoreException(e);
+        }
+    }
+
+    /**
+     * Restart if the instance is currently running.
+     */
+    public void restart() {
+        if ( this.isRunning() ) {
+            this.stop();
+            this.start();
+        }
+    }
+
+    @Override
+    public void run() {
+        while ( this.active.get() ) {
+            final long startTime;
+            // we have to wait to get started
+            synchronized ( this.loadLock ) {
+                while ( this.active.get() && !this.running ) {
+                    try {
+                        this.loadLock.wait();
+                    } catch (final InterruptedException e) {
+                        // ignore
+                    }
+                }
+                startTime = System.currentTimeMillis();
+            }
+
+            // give the system some time to start
+            if ( this.isRunning() ) {
+                synchronized ( this.stopLock ) {
+                    try {
+                        this.stopLock.wait(1000 * this.configuration.getBackgroundLoadDelay());
+                    } catch (final InterruptedException e) {
+                        // ignore
+                    }
+                }
+            }
+
+            // load jobs from the resource tree
+            if ( this.isRunning() ) {
+                this.loadJobsInTheBackground(startTime);
+            }
+
+            // and finally process the action queue
+            while ( this.isRunning() ) {
+                String path = null;
+                try {
+                    path = this.actionQueue.take();
+                } catch (final InterruptedException e) {
+                    this.ignoreException(e);
+                }
+                if ( path != null && !END_TOKEN.equals(path) && this.isRunning() ) {
+                    ResourceResolver resolver = null;
+                    try {
+                        resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                        final Resource resource = resolver.getResource(path);
+                        if ( ResourceHelper.RESOURCE_TYPE_JOB.equals(resource.getResourceType()) ) {
+                            this.logger.debug("Reading local job from {}", path);
+                            final JobImpl job = this.jobManager.readJob(resource);
+                            if ( job != null ) {
+                                if ( job.hasReadErrors() ) {
+                                    synchronized ( this.unloadedJobs ) {
+                                        this.unloadedJobs.add(path);
+                                    }
+                                } else {
+                                    this.jobManager.process(job);
+                                }
+                            }
+                        }
+                    } catch ( final LoginException le ) {
+                        // administrative login should always work
+                        this.ignoreException(le);
+                    } finally {
+                        if ( resolver != null ) {
+                            resolver.close();
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Load all active jobs from the resource tree.
+     */
+    private void loadJobsInTheBackground(final long startTime) {
+        logger.debug("Starting background loading...");
+
+        ResourceResolver resolver = null;
+        long count = 0;
+        try {
+            resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+            final Calendar startDate = Calendar.getInstance();
+            startDate.setTimeInMillis(startTime);
+
+            final StringBuilder buf = new StringBuilder(64);
+
+            buf.append("//element(*,");
+            buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
+            buf.append(")[@");
+            buf.append(ISO9075.encode(Job.PROPERTY_JOB_TARGET_INSTANCE));
+            buf.append(" = '");
+            buf.append(Environment.APPLICATION_ID);
+            buf.append("' and @");
+            buf.append(ISO9075.encode(Job.PROPERTY_JOB_CREATED));
+            buf.append(" < xs:dateTime('");
+            buf.append(ISO8601.format(startDate));
+            buf.append("')");
+            buf.append("] order by @");
+            buf.append(ISO9075.encode(Job.PROPERTY_JOB_CREATED));
+            buf.append(" ascending");
+            if ( this.isRunning() ) {
+                final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
+
+                while ( this.isRunning() && result.hasNext() ) {
+                    final Resource jobResource = result.next();
+                    if ( this.loadJobInTheBackground(jobResource) ) {
+                        count++;
+                    }
+                }
+            }
+        } catch (final QuerySyntaxException qse) {
+            this.ignoreException(qse);
+        } catch (final LoginException le) {
+            this.ignoreException(le);
+        } finally {
+            if ( resolver != null ) {
+                resolver.close();
+            }
+        }
+
+        logger.debug("Finished background loading of {} jobs.", count);
+    }
+
+    /**
+     * Load a single job from the resource tree.
+     */
+    private boolean loadJobInTheBackground(final Resource jobResource) {
+        // sanity check for the path
+        if ( this.configuration.isLocalJob(jobResource.getPath()) ) {
+            final JobImpl job = this.jobManager.readJob(jobResource);
+            if ( job != null ) {
+                // check if the job is currently running
+                if ( this.changeCount == 1 || job.getProcessingStarted() == null ) {
+                    // reset started time
+                    if ( job.getProcessingStarted() != null && this.isRunning() ) {
+                        job.getProperties().remove(Job.PROPERTY_JOB_STARTED_TIME);
+                        // make sure to clear the started time
+                        try {
+                            final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+                            mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                            jobResource.getResourceResolver().commit();
+                        } catch ( final PersistenceException ignore) {
+                            this.ignoreException(ignore);
+                        }
+                    }
+
+                    if ( job.hasReadErrors() ) {
+                        synchronized ( this.unloadedJobs ) {
+                            this.unloadedJobs.add(job.getResourcePath());
+                        }
+                    } else {
+                        if ( this.isRunning() ) {
+                            this.jobManager.process(job);
+                        }
+                    }
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean isRunning() {
+        return this.active.get() && this.running;
+    }
+
+    /**
+     * Try to reload unloaded jobs - this method is invoked if bundles have been added etc.
+     */
+    public void tryToReloadUnloadedJobs() {
+        // bundle event started or updated
+        final Set<String> copyUnloadedJobs = new HashSet<String>();
+        synchronized ( this.unloadedJobs ) {
+            copyUnloadedJobs.addAll(this.unloadedJobs);
+            this.unloadedJobs.clear();
+        }
+        if ( copyUnloadedJobs.size() > 0 ) {
+            final Runnable t = new Runnable() {
+
+                @Override
+                public void run() {
+                    synchronized ( loadLock ) {
+                        if ( isRunning() ) {
+                            final Iterator<String> iter = copyUnloadedJobs.iterator();
+                            while ( iter.hasNext() ) {
+                                try {
+                                    actionQueue.put(iter.next());
+                                } catch (InterruptedException e) {
+                                    ignoreException(e);
+                                }
+                            }
+                        }
+                    }
+                }
+
+            };
+            Environment.THREAD_POOL.execute(t);
+        }
+    }
+
+    public void loadJob(final String path) {
+        try {
+            this.actionQueue.put(path);
+        } catch (final InterruptedException e) {
+            this.ignoreException(e);
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
------------------------------------------------------------------------------
    svn:keywords = author date id revision rev url

Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain