You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/22 13:42:55 UTC
svn commit: r1470462 [1/7] - in /sling/trunk/bundles/extensions/event: ./
src/main/java/org/apache/sling/event/
src/main/java/org/apache/sling/event/impl/
src/main/java/org/apache/sling/event/impl/dea/
src/main/java/org/apache/sling/event/impl/jobs/ sr...
Author: cziegeler
Date: Mon Apr 22 11:42:53 2013
New Revision: 1470462
URL: http://svn.apache.org/r1470462
Log:
SLING-2832 : Use resource API
SLING-2831 : Use new discovery API
SLING-2830 : Discontinue per job configurations for queue processing
SLING-2829 : Add API for starting a job and service interface for executing a job
Added:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/MainQueueConfiguration.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/EventAdminBridge.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusNotifier.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobStatusProviderImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/deprecated/JobsIteratorImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jmx/QueueStatusEvent.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/stats/TopicStatisticsImpl.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/ScheduleInfo.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventConfiguration.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventReceiver.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/timed/TimedEventSender.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ExactTopicMatcher.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/PackageTopicMatcher.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/SubPackagesTopicMatcher.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/TopicMatcher.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/TopicMatcherHelper.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java (with props)
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobConsumer.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/dea/
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/dea/DistributingEventHandlerTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/ClassloadingTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/DropQueueTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/IgnoreQueueTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java (with props)
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/TimedJobsTest.java (with props)
Removed:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/AbstractRepositoryEventHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/DistributingEventHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/TimedJobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/DefaultJobManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobEvent.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobStatusNotifier.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobsIteratorImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/QueueStatusEvent.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/StatisticsImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopicStatisticsImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jcr/
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractRepositoryEventHandlerTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/AbstractTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/DistributingEventHandlerTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/RepositoryTestUtil.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleEventAdmin.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/SimpleScheduler.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/AbstractJobEventHandlerTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/DropQueueTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/ExtendedJobManager.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/IgnoreQueueTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/JobEventHandlerTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/OrderedJobQueueTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/RoundRobinQueueTest.java
Modified:
sling/trunk/bundles/extensions/event/pom.xml
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/QueueConfigurationManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/console/WebConsolePlugin.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jmx/AllJobStatisticsMBean.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/jmx/QueuesMBeanImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/TopicRoundRobinJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobProcessor.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobUtil.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobsIterator.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/EventUtilTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/StatisticsImplTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/UtilityTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfigurationTest.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/impl/jobs/jmx/QueuesMBeanImplTest.java
Modified: sling/trunk/bundles/extensions/event/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/pom.xml?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/pom.xml (original)
+++ sling/trunk/bundles/extensions/event/pom.xml Mon Apr 22 11:42:53 2013
@@ -45,8 +45,13 @@
<properties>
<site.jira.version.id>12315369</site.jira.version.id>
<sling.java.version>6</sling.java.version>
+ <exam.version>2.5.0</exam.version>
+ <url.version>1.4.0</url.version>
+ <bundle.build.name>${basedir}/target</bundle.build.name>
+ <bundle.file.name>${bundle.build.name}/${project.build.finalName}.jar</bundle.file.name>
</properties>
-
+
+
<build>
<plugins>
<plugin>
@@ -68,7 +73,7 @@
</Import-Package>
<Export-Package>
org.apache.sling.event;version=2.4.0,
- org.apache.sling.event.jobs;version=1.1.0,
+ org.apache.sling.event.jobs;version=1.2.0,
org.apache.sling.event.jobs.jmx;version=1.0.0
</Export-Package>
<Private-Package>
@@ -87,6 +92,60 @@
</instructions>
</configuration>
</plugin>
+ <!-- plain unit tests -->
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.12</version>
+ <configuration>
+ <excludes>
+ <exclude>**/it/**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <!-- integration tests run with pax-exam -->
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.12</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>project.bundle.file</name>
+ <value>${bundle.file.name}</value>
+ </property>
+ </systemProperties>
+ <argLine>
+ -Xmx1024m -XX:MaxPermSize=368m
+ </argLine>
+ <includes>
+ <include>**/it/*</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>${basedir}</directory>
+ <includes>
+ <include>derby.log</include>
+ </includes>
+ </fileset>
+ <fileset>
+ <directory>jackrabbit</directory>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
</plugins>
</build>
<reporting>
@@ -103,35 +162,29 @@
</plugins>
</reporting>
<dependencies>
- <!-- We have to put this test dependency first, to make sure this version is
- used for testing! -->
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>1.4</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.api</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
+ <version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.compendium</artifactId>
- </dependency>
- <dependency>
- <groupId>javax.jcr</groupId>
- <artifactId>jcr</artifactId>
- <version>2.0</version>
- <scope>provided</scope>
+ <version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
@@ -141,8 +194,8 @@
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
- <artifactId>org.apache.sling.jcr.api</artifactId>
- <version>2.0.6</version>
+ <artifactId>org.apache.sling.api</artifactId>
+ <version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
@@ -182,12 +235,6 @@
</dependency>
<!-- Testing -->
<dependency>
- <groupId>org.apache.sling</groupId>
- <artifactId>org.apache.sling.jcr.resource</artifactId>
- <version>2.0.6</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
@@ -211,11 +258,68 @@
<version>1.9.5</version>
<scope>test</scope>
</dependency>
+
+
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-container-native</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
- <groupId>org.apache.jackrabbit</groupId>
- <artifactId>jackrabbit-core</artifactId>
- <version>2.2.4</version>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-junit4</artifactId>
+ <version>${exam.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-link-mvn</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.ops4j.pax.url</groupId>
+ <artifactId>pax-url-aether</artifactId>
+ <version>${url.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.framework</artifactId>
+ <version>3.2.2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>0.9.20</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>0.9.20</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.launchpad.api</artifactId>
+ <version>1.1.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.testing.resourceresolver-mock</artifactId>
+ <version>0.1.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventPropertiesMap.java Mon Apr 22 11:42:53 2013
@@ -78,6 +78,7 @@ public class EventPropertiesMap
/**
* @see java.util.Map#clear()
*/
+ @Override
public void clear() {
delegatee.clear();
}
@@ -85,6 +86,7 @@ public class EventPropertiesMap
/**
* @see java.util.Map#containsKey(java.lang.Object)
*/
+ @Override
public boolean containsKey(Object key) {
return delegatee.containsKey(key);
}
@@ -92,6 +94,7 @@ public class EventPropertiesMap
/**
* @see java.util.Map#containsValue(java.lang.Object)
*/
+ @Override
public boolean containsValue(Object value) {
return delegatee.containsValue(value);
}
@@ -99,6 +102,7 @@ public class EventPropertiesMap
/**
* @see java.util.Map#entrySet()
*/
+ @Override
public Set<java.util.Map.Entry<String, Object>> entrySet() {
return delegatee.entrySet();
}
@@ -106,13 +110,18 @@ public class EventPropertiesMap
/**
* @see java.lang.Object#equals(java.lang.Object)
*/
+ @Override
public boolean equals(Object o) {
+ if ( o instanceof EventPropertiesMap ) {
+ return delegatee.equals(((EventPropertiesMap)o).delegatee);
+ }
return delegatee.equals(o);
}
/**
* @see java.util.Dictionary#get(java.lang.Object)
*/
+ @Override
public Object get(Object key) {
return delegatee.get(key);
}
@@ -120,6 +129,7 @@ public class EventPropertiesMap
/**
* @see java.lang.Object#hashCode()
*/
+ @Override
public int hashCode() {
return delegatee.hashCode();
}
@@ -127,6 +137,7 @@ public class EventPropertiesMap
/**
* @see java.util.Dictionary#isEmpty()
*/
+ @Override
public boolean isEmpty() {
return delegatee.isEmpty();
}
@@ -134,6 +145,7 @@ public class EventPropertiesMap
/**
* @see java.util.Map#keySet()
*/
+ @Override
public Set<String> keySet() {
return delegatee.keySet();
}
@@ -141,6 +153,7 @@ public class EventPropertiesMap
/**
* @see java.util.Dictionary#put(java.lang.Object, java.lang.Object)
*/
+ @Override
public Object put(String key, Object value) {
return delegatee.put(key, value);
}
@@ -148,6 +161,7 @@ public class EventPropertiesMap
/**
* @see java.util.Map#putAll(java.util.Map)
*/
+ @Override
public void putAll(Map<? extends String, ? extends Object> t) {
delegatee.putAll(t);
}
@@ -155,6 +169,7 @@ public class EventPropertiesMap
/**
* @see java.util.Dictionary#remove(java.lang.Object)
*/
+ @Override
public Object remove(Object key) {
return delegatee.remove(key);
}
@@ -162,6 +177,7 @@ public class EventPropertiesMap
/**
* @see java.util.Dictionary#size()
*/
+ @Override
public int size() {
return delegatee.size();
}
@@ -169,6 +185,7 @@ public class EventPropertiesMap
/**
* @see java.util.Map#values()
*/
+ @Override
public Collection<Object> values() {
return delegatee.values();
}
@@ -176,6 +193,7 @@ public class EventPropertiesMap
/**
* @see java.util.Dictionary#elements()
*/
+ @Override
public Enumeration<Object> elements() {
return Collections.enumeration(this.values());
}
@@ -183,6 +201,7 @@ public class EventPropertiesMap
/**
* @see java.util.Dictionary#keys()
*/
+ @Override
public Enumeration<String> keys() {
return Collections.enumeration(this.keySet());
}
@@ -190,6 +209,7 @@ public class EventPropertiesMap
/**
* @see java.lang.Object#toString()
*/
+ @Override
public String toString() {
return this.delegatee.toString();
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/EventUtil.java Mon Apr 22 11:42:53 2013
@@ -165,13 +165,13 @@ public abstract class EventUtil {
/** The property for the unique event id. */
public static final String PROPERTY_TIMED_EVENT_ID = "event.timed.id";
- /** The scheduler expression for the timed event. */
+ /** The scheduler cron expression for the timed event. Type must be String. */
public static final String PROPERTY_TIMED_EVENT_SCHEDULE = "event.timed.scheduler";
- /** The period for the timed event. */
+ /** The period in seconds for the timed event. Type must be Long*/
public static final String PROPERTY_TIMED_EVENT_PERIOD = "event.timed.period";
- /** The date for the timed event. */
+ /** The date for the timed event. Type must be Date. */
public static final String PROPERTY_TIMED_EVENT_DATE = "event.timed.date";
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/TimedEventStatusProvider.java Mon Apr 22 11:42:53 2013
@@ -29,12 +29,12 @@ import org.osgi.service.event.Event;
public interface TimedEventStatusProvider {
/**
- * This is a unique identifer which can be used to cancel the job.
+ * This is a unique identifier which can be used to cancel the job.
*/
String PROPERTY_EVENT_ID = "slingevent:eventId";
/**
- * Return a list of currently schedulded events.
+ * Return a list of currently scheduled events.
* @param topic Topic can be used as a filter, if it is non-null, only jobs with this topic will be returned.
* @param filterProps A list of filter property maps. Each map acts like a template. The searched event
* must match the template (AND query). By providing several maps, different filters
@@ -51,7 +51,7 @@ public interface TimedEventStatusProvide
/**
* Cancel this timed event.
- * @param jobId The unique identifer as found in the property {@link #PROPERTY_EVENT_ID}.
+ * @param jobId The unique identifier as found in the property {@link #PROPERTY_EVENT_ID}.
*/
void cancelTimedEvent(String jobId);
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EnvironmentComponent.java Mon Apr 22 11:42:53 2013
@@ -18,9 +18,6 @@
*/
package org.apache.sling.event.impl;
-import javax.jcr.RepositoryException;
-import javax.jcr.Session;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -31,25 +28,20 @@ import org.apache.felix.scr.annotations.
import org.apache.sling.commons.classloader.DynamicClassLoaderManager;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.event.impl.support.Environment;
-import org.apache.sling.jcr.api.SlingRepository;
import org.apache.sling.settings.SlingSettingsService;
-import org.osgi.service.event.EventAdmin;
/**
* Environment component. This component provides "global settings"
* to all services, like the application id and the thread pool.
* @since 3.0
+ *
+ * This component needs to be immediate to set the global variables
+ * (application id and thread pool).
*/
-@Component()
+@Component(immediate=true)
@Service(value=EnvironmentComponent.class)
public class EnvironmentComponent {
- @Reference
- private SlingRepository repository;
-
- @Reference
- private EventAdmin eventAdmin;
-
@Reference(policy=ReferencePolicy.DYNAMIC,cardinality=ReferenceCardinality.OPTIONAL_UNARY)
private DynamicClassLoaderManager classLoaderManager;
@@ -74,7 +66,7 @@ public class EnvironmentComponent {
}
/**
- * Dectivate this component.
+ * Deactivate this component.
*/
@Deactivate
protected void deactivate() {
@@ -84,41 +76,6 @@ public class EnvironmentComponent {
}
}
- /**
- * Return the dynamic classloader for loading events from the repository.
- */
- public ClassLoader getDynamicClassLoader() {
- final DynamicClassLoaderManager dclm = this.classLoaderManager;
- if ( dclm != null ) {
- return dclm.getDynamicClassLoader();
- }
- // if we don't have a dynamic classloader, we return our classloader
- return this.getClass().getClassLoader();
- }
-
- /**
- * Create a new admin session.
- * @return A new admin session.
- * @throws RepositoryException
- */
- public Session createAdminSession()
- throws RepositoryException {
- final SlingRepository repo = this.repository;
- if ( repo == null ) {
- // as the repo is a hard dependency for this service, the repo
- // is always available, but we check for null anyway!
- throw new RepositoryException("Repository is currently not available.");
- }
- return repo.loginAdministrative(null);
- }
-
- /**
- * Return the event admin.
- */
- public EventAdmin getEventAdmin() {
- return this.eventAdmin;
- }
-
protected void bindThreadPool(final EventingThreadPool etp) {
this.threadPool = etp;
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java?rev=1470462&r1=1470461&r2=1470462&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/EventingThreadPool.java Mon Apr 22 11:42:53 2013
@@ -18,7 +18,11 @@
*/
package org.apache.sling.event.impl;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.PropertyOption;
import org.apache.felix.scr.annotations.Reference;
@@ -29,7 +33,6 @@ import org.apache.sling.commons.threads.
import org.apache.sling.commons.threads.ThreadPoolConfig;
import org.apache.sling.commons.threads.ThreadPoolConfig.ThreadPriority;
import org.apache.sling.commons.threads.ThreadPoolManager;
-import org.osgi.service.component.ComponentContext;
/**
@@ -60,37 +63,39 @@ public class EventingThreadPool implemen
/**
* Activate this component.
- * @param context
*/
- protected void activate(final ComponentContext ctx) {
+ @Activate
+ protected void activate(final Map<String, Object> props) {
final ModifiableThreadPoolConfig config = new ModifiableThreadPoolConfig();
- config.setMinPoolSize(PropertiesUtil.toInteger(ctx.getProperties().get(PROPERTY_POOL_SIZE), DEFAULT_POOL_SIZE));
+ config.setMinPoolSize(PropertiesUtil.toInteger(props.get(PROPERTY_POOL_SIZE), DEFAULT_POOL_SIZE));
config.setMaxPoolSize(config.getMinPoolSize());
config.setQueueSize(-1); // unlimited
config.setShutdownGraceful(true);
- config.setPriority(ThreadPriority.valueOf(PropertiesUtil.toString(ctx.getProperties().get(PROPERTY_PRIORITY), "NORM")));
+ config.setPriority(ThreadPriority.valueOf(PropertiesUtil.toString(props.get(PROPERTY_PRIORITY), "NORM")));
config.setDaemon(true);
this.threadPool = threadPoolManager.create(config, "Apache Sling Eventing Thread Pool");
}
/**
* Deactivate this component.
- * @param context
*/
- protected void deactivate(final ComponentContext context) {
+ @Deactivate
+ protected void deactivate() {
this.threadPoolManager.release(this.threadPool);
}
/**
* @see org.apache.sling.commons.threads.ThreadPool#execute(java.lang.Runnable)
*/
- public void execute(Runnable runnable) {
+ @Override
+ public void execute(final Runnable runnable) {
threadPool.execute(runnable);
}
/**
* @see org.apache.sling.commons.threads.ThreadPool#getConfiguration()
*/
+ @Override
public ThreadPoolConfig getConfiguration() {
return threadPool.getConfiguration();
}
@@ -98,6 +103,7 @@ public class EventingThreadPool implemen
/**
* @see org.apache.sling.commons.threads.ThreadPool#getName()
*/
+ @Override
public String getName() {
return threadPool.getName();
}
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.dea;
+
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.event.impl.EnvironmentComponent;
+import org.apache.sling.event.impl.support.Environment;
+
+/**
+ * This service wraps the configuration of the distributed event admin.
+ */
+@Component(name="org.apache.sling.event.impl.DistributingEventHandler")
+@Service(value=DistributedEventAdminConfiguration.class)
+public class DistributedEventAdminConfiguration {
+
+ /** Default repository path. */
+ private static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/distribution";
+
+ /** The path where all jobs are stored. */
+ @Property(value=DEFAULT_REPOSITORY_PATH, propertyPrivate=true)
+ private static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
+
+ /** Default clean up time is 15 minutes. */
+ private static final int DEFAULT_CLEANUP_PERIOD = 15;
+
+ @Property(intValue=DEFAULT_CLEANUP_PERIOD, propertyPrivate=true)
+ private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+ /** We remove everything which is older than 15min by default. */
+ private int cleanupPeriod = DEFAULT_CLEANUP_PERIOD;
+
+ /** The path in the resource tree. */
+ private String rootPath;
+
+ private String rootPathWithSlash;
+
+ private String ownRootPath;
+
+ private String ownRootPathSlash;
+
+ @Reference
+ private EnvironmentComponent environment;
+
+ @Activate
+ protected void activate(final Map<String, Object> props) {
+ this.cleanupPeriod = PropertiesUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+ this.rootPath = PropertiesUtil.toString(props.get(
+ CONFIG_PROPERTY_REPOSITORY_PATH), DEFAULT_REPOSITORY_PATH);
+ this.rootPathWithSlash = this.rootPath.concat("/");
+ this.ownRootPath = this.rootPathWithSlash.concat(Environment.APPLICATION_ID);
+ this.ownRootPathSlash = this.ownRootPath.concat("/");
+ }
+
+ /**
+ * This is the root path for all events.
+ * @return The path ending with a slash.
+ */
+ public String getRootPathWithSlash() {
+ return this.rootPathWithSlash;
+ }
+
+ /**
+ * This is the root path for all events.
+ * @return The path ending with a slash.
+ */
+ public String getRootPath() {
+ return this.rootPath;
+ }
+
+ /**
+ * This is the root path for all events.
+ * @return The path does not end with a slash.
+ */
+ public String getOwnRootPath() {
+ return this.ownRootPath;
+ }
+
+ /**
+ * This is the root path for all events of this instance.
+ * @return The path ending with a slash.
+ */
+ public String getOwnRootPathWithSlash() {
+ return this.ownRootPathSlash;
+ }
+
+ public int getCleanupPeriod() {
+ return this.cleanupPeriod;
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventAdminConfiguration.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.dea;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.event.EventUtil;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import aQute.bnd.annotation.component.Deactivate;
+
+/**
+ * This is the distributed event receiver.
+ * It listens for all distributable events and stores them in the
+ * repository for other cluster instances to pick them up.
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ */
+@Component(immediate=true)
+@Service(value={EventHandler.class, Runnable.class, TopologyEventListener.class})
+@Properties({
+ @Property(name=EventConstants.EVENT_TOPIC, value="*", propertyPrivate=true),
+ @Property(name=EventConstants.EVENT_FILTER, value="(event.distribute=*)", propertyPrivate=true),
+ @Property(name="scheduler.period", longValue=1800, propertyPrivate=true),
+ @Property(name="scheduler.concurrent", boolValue=false, propertyPrivate=true),
+})
+public class DistributedEventReceiver
+ implements EventHandler, Runnable, TopologyEventListener {
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ @Reference
+ private DistributedEventAdminConfiguration config;
+
+ /** Is the background task still running? */
+ private volatile boolean running;
+
+ /** A local queue for writing received events into the repository. */
+ private final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
+
+ @Reference
+ private ResourceResolverFactory resourceResolverFactory;
+
+ /** Resolver used for writing. */
+ private ResourceResolver writerResolver;
+
+ /** The current instances if we're the leader. */
+ private Set<String> instances;
+
+ @Activate
+ protected void activate() {
+ this.running = true;
+ // start writer thread
+ final Thread writerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ writerResolver = resourceResolverFactory.getAdministrativeResourceResolver(null);
+ ResourceHelper.getOrCreateBasePath(writerResolver, config.getOwnRootPath());
+ } catch (final Exception e) {
+ // there is nothing we can do except log!
+ logger.error("Error during resource resolver creation.", e);
+ running = false;
+ }
+ try {
+ processWriteQueue();
+ } catch (final Throwable t) { //NOSONAR
+ logger.error("Writer thread stopped with exception: " + t.getMessage(), t);
+ running = false;
+ }
+ if ( writerResolver != null ) {
+ writerResolver.close();
+ writerResolver = null;
+ }
+ }
+ });
+ writerThread.start();
+ }
+
+ /**
+ * Deactivate this component.
+ */
+ @Deactivate
+ protected void deactivate() {
+ // stop background threads by putting empty objects into the queue
+ this.running = false;
+ try {
+ this.writeQueue.put(new Event("some", (Dictionary<String, Object>)null));
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+
+ /**
+ * Background thread writing events into the queue.
+ */
+ private void processWriteQueue() {
+ while ( this.running ) {
+ // so let's wait/get the next event from the queue
+ Event event = null;
+ try {
+ event = this.writeQueue.take();
+ } catch (final InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( event != null && this.running ) {
+ try {
+ this.writeEvent(event);
+ } catch (final Exception e) {
+ this.logger.error("Exception during writing the event to the resource tree.", e);
+ }
+ }
+ }
+ }
+
+ /** Counter for jobs without an id. */
+ private final AtomicLong eventCounter = new AtomicLong(0);
+
+ /**
+ * Write an event to the resource tree.
+ * @param event The event
+ * @throws PersistenceException
+ */
+ private void writeEvent(final Event event)
+ throws PersistenceException {
+ final Calendar now = Calendar.getInstance();
+
+ final StringBuilder sb = new StringBuilder(this.config.getOwnRootPathWithSlash());
+ sb.append(now.get(Calendar.YEAR));
+ sb.append('/');
+ sb.append(now.get(Calendar.MONTH) + 1);
+ sb.append('/');
+ sb.append(now.get(Calendar.DAY_OF_MONTH));
+ sb.append('/');
+ sb.append(now.get(Calendar.HOUR_OF_DAY));
+ sb.append('/');
+ sb.append(now.get(Calendar.MINUTE));
+ sb.append('/');
+ sb.append("event-");
+ sb.append(String.valueOf(eventCounter.getAndIncrement()));
+
+ // create properties
+ final Map<String, Object> properties = new HashMap<String, Object>();
+
+ final String[] propNames = event.getPropertyNames();
+ if ( propNames != null && propNames.length > 0 ) {
+ for(final String propName : propNames) {
+ if ( !ResourceHelper.ignoreProperty(propName) || JobUtil.JOB_ID.equals(propName) ) { // special handling for job id
+ properties.put(propName, event.getProperty(propName));
+ }
+ }
+ }
+
+ properties.put(EventConstants.EVENT_TOPIC, event.getTopic());
+ properties.put(EventUtil.PROPERTY_APPLICATION, Environment.APPLICATION_ID);
+ properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, ResourceHelper.RESOURCE_TYPE_EVENT);
+
+ ResourceHelper.getOrCreateResource(this.writerResolver,
+ sb.toString(),
+ properties);
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ @Override
+ public void handleEvent(final Event event) {
+ try {
+ this.writeQueue.put(event);
+ } catch (final InterruptedException ex) {
+ // we ignore this
+ this.ignoreException(ex);
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * This method is invoked periodically.
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ this.cleanUpObsoleteInstances();
+ this.cleanUpObsoleteEvents();
+ }
+
+ private void cleanUpObsoleteInstances() {
+ final Set<String> slingIds = this.instances;
+ if ( slingIds != null ) {
+ this.instances = null;
+ this.logger.debug("Checking for old instance trees for distributed events.");
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ final Resource baseResource = resolver.getResource(this.config.getRootPathWithSlash());
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final Iterator<Resource> iter = baseResource.listChildren();
+ while ( iter.hasNext() ) {
+ final Resource rootResource = iter.next();
+ if ( !slingIds.contains(rootResource.getName()) ) {
+ resolver.delete(rootResource);
+ }
+ }
+ resolver.commit();
+ }
+
+ } catch (final PersistenceException pe) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during job resource tree cleanup.", pe);
+ } catch (final LoginException ignore) {
+ this.ignoreException(ignore);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+
+ private void cleanUpObsoleteEvents() {
+ if ( this.config.getCleanupPeriod() > 0 ) {
+ this.logger.debug("Cleaning up distributed events, removing all entries older than {} minutes.", this.config.getCleanupPeriod());
+
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ final Resource baseResource = resolver.getResource(this.config.getOwnRootPath());
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final Calendar oldDate = Calendar.getInstance();
+ oldDate.add(Calendar.MINUTE, -1 * this.config.getCleanupPeriod());
+
+ // check years
+ final int oldYear = oldDate.get(Calendar.YEAR);
+ final Iterator<Resource> yearIter = baseResource.listChildren();
+ while ( yearIter.hasNext() ) {
+ final Resource yearResource = yearIter.next();
+ final int year = Integer.valueOf(yearResource.getName());
+ if ( year < oldYear ) {
+ resolver.delete(yearResource);
+ } else if ( year == oldYear ) {
+
+ // same year - check months
+ final int oldMonth = oldDate.get(Calendar.MONTH) + 1;
+ final Iterator<Resource> monthIter = yearResource.listChildren();
+ while ( monthIter.hasNext() ) {
+ final Resource monthResource = monthIter.next();
+ final int month = Integer.valueOf(monthResource.getName());
+ if ( month < oldMonth ) {
+ resolver.delete(monthResource);
+ } else if ( month == oldMonth ) {
+
+ // same month - check days
+ final int oldDay = oldDate.get(Calendar.DAY_OF_MONTH);
+ final Iterator<Resource> dayIter = monthResource.listChildren();
+ while ( dayIter.hasNext() ) {
+ final Resource dayResource = dayIter.next();
+ final int day = Integer.valueOf(dayResource.getName());
+ if ( day < oldDay ) {
+ resolver.delete(dayResource);
+ } else if ( day == oldDay ) {
+
+ // same day - check hours
+ final int oldHour = oldDate.get(Calendar.HOUR_OF_DAY);
+ final Iterator<Resource> hourIter = dayResource.listChildren();
+ while ( hourIter.hasNext() ) {
+ final Resource hourResource = hourIter.next();
+ final int hour = Integer.valueOf(hourResource.getName());
+ if ( hour < oldHour ) {
+ resolver.delete(hourResource);
+ } else if ( hour == oldHour ) {
+
+ // same hour - check minutes
+ final int oldMinute = oldDate.get(Calendar.MINUTE);
+ final Iterator<Resource> minuteIter = hourResource.listChildren();
+ while ( minuteIter.hasNext() ) {
+ final Resource minuteResource = minuteIter.next();
+
+ final int minute = Integer.valueOf(minuteResource.getName());
+ if ( minute < oldMinute ) {
+ resolver.delete(minuteResource);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ resolver.commit();
+
+ } catch (final PersistenceException pe) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during job resource tree cleanup.", pe);
+ } catch (final LoginException ignore) {
+ this.ignoreException(ignore);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
+ */
+ @Override
+ public void handleTopologyEvent(final TopologyEvent event) {
+ if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+ this.instances = null;
+ } else if ( event.getType() == Type.TOPOLOGY_CHANGED || event.getType() == Type.TOPOLOGY_INIT ) {
+ if ( event.getNewView().getLocalInstance().isLeader() ) {
+ final Set<String> set = new HashSet<String>();
+ for(final InstanceDescription desc : event.getNewView().getInstances() ) {
+ set.add(desc.getSlingId());
+ }
+ this.instances = set;
+ }
+ }
+ }
+}
+
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventReceiver.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.dea;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.JobUtil;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import aQute.bnd.annotation.component.Deactivate;
+
+/**
+ * This event handler distributes events across an application cluster.
+ *
+ * We schedule this event handler to run in the background and clean up
+ * obsolete events.
+ */
+@Component(immediate=true)
+@Service(value={EventHandler.class})
+@Properties({
+ @Property(name=EventConstants.EVENT_TOPIC, value=SlingConstants.TOPIC_RESOURCE_ADDED, propertyPrivate=true),
+})
+public class DistributedEventSender
+ implements EventHandler {
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** Is the background task still running? */
+ private volatile boolean running;
+
+ /** A local queue for serializing the event processing. */
+ private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+
+ @Reference
+ private ResourceResolverFactory resourceResolverFactory;
+
+ @Reference
+ private DistributedEventAdminConfiguration config;
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ @Activate
+ protected void activate() {
+ this.running = true;
+ final Thread backgroundThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ runInBackground();
+ } catch (Throwable t) { //NOSONAR
+ logger.error("Background thread stopped with exception: " + t.getMessage(), t);
+ running = false;
+ }
+ }
+ });
+ backgroundThread.start();
+ }
+
+ /**
+ * Deactivate this component.
+ */
+ @Deactivate
+ protected void deactivate() {
+ // stop background threads by putting empty objects into the queue
+ this.running = false;
+ try {
+ this.queue.put("");
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+
+ /**
+ * Read an event from the resource
+ * @return The event object or <code>null</code>
+ */
+ private Event readEvent(final Resource eventResource) {
+ final ValueMap vm = ResourceUtil.getValueMap(eventResource);
+ final String topic = vm.get(EventConstants.EVENT_TOPIC, String.class);
+ final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+ // only send event if there are no read errors, otherwise discard it
+ if ( properties.get(ResourceHelper.PROPERTY_MARKER_READ_ERROR) == null ) {
+ properties.remove(EventConstants.EVENT_TOPIC);
+
+ try {
+ // special handling for job notification jobs for compatibility
+ if ( topic.startsWith("org/apache/sling/event/notification/job/") ) {
+ final String jobTopic = (String)properties.get(JobUtil.NOTIFICATION_PROPERTY_JOB_TOPIC);
+ if ( jobTopic != null) {
+ final Event jobEvent = new Event(jobTopic, properties);
+ properties.put(JobUtil.PROPERTY_NOTIFICATION_JOB, jobEvent);
+ }
+ }
+ final Event event = new Event(topic, properties);
+ return event;
+ } catch (final IllegalArgumentException iae) {
+ // this exception occurs if the topic is not correct (it should never happen,
+ // but you never know)
+ logger.error("Unable to read event: " + iae.getMessage(), iae);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Background thread
+ */
+ private void runInBackground() {
+ while ( this.running ) {
+ // so let's wait/get the next event from the queue
+ String path = null;
+ try {
+ path = this.queue.take();
+ } catch (final InterruptedException e) {
+ // we ignore this
+ this.ignoreException(e);
+ }
+ if ( path != null && path.length() > 0 && this.running ) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource eventResource = resolver.getResource(path);
+ if ( eventResource.isResourceType(ResourceHelper.RESOURCE_TYPE_EVENT) ) {
+ final Event e = this.readEvent(eventResource);
+ if ( e != null ) {
+ // we check event admin as processing is async
+ final EventAdmin localEA = this.eventAdmin;
+ if ( localEA != null ) {
+ localEA.postEvent(e);
+ } else {
+ this.logger.error("Unable to post event as no event admin is available.");
+ }
+ }
+ }
+ } catch (final LoginException ex) {
+ this.logger.error("Exception during creation of resource resolver.", ex);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ @Override
+ public void handleEvent(final Event event) {
+ final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
+ if ( path != null
+ && path.startsWith(this.config.getRootPathWithSlash())
+ && !path.startsWith(this.config.getOwnRootPathWithSlash()) ) {
+
+ try {
+ this.queue.put(path);
+ } catch (final InterruptedException ex) {
+ // we ignore this
+ this.ignoreException(ex);
+ }
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/dea/DistributedEventSender.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java?rev=1470462&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java (added)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java Mon Apr 22 11:42:53 2013
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs;
+
+import java.util.Calendar;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.jackrabbit.util.ISO8601;
+import org.apache.jackrabbit.util.ISO9075;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.QuerySyntaxException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Task for loading stored jobs from the repository.
+ *
+ * TODO - we need better synching - especially something for loadJob(String)
+ */
+public class BackgroundLoader implements Runnable {
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** The job manager configuration. */
+ private final JobManagerConfiguration configuration;
+
+ /** Resource resolver factory. */
+ private final ResourceResolverFactory resourceResolverFactory;
+
+ /** Is this still active? */
+ private final AtomicBoolean active = new AtomicBoolean(false);
+
+ private volatile boolean running = false;
+
+ /** Job Manager implementation. */
+ private final JobManagerImpl jobManager;
+
+ /** Lock object for loading */
+ private final Object loadLock = new Object();
+
+ /** Lock object for stopping */
+ private final Object stopLock = new Object();
+
+ /** Unloaded jobs. */
+ private final Set<String> unloadedJobs = new HashSet<String>();
+
+ /** A local queue for handling new jobs. */
+ private final BlockingQueue<String> actionQueue = new LinkedBlockingQueue<String>();
+
+ private long changeCount = 0;
+
+ /**
+ * Create and activate the loader.
+ */
+ public BackgroundLoader(final JobManagerImpl jobManagerImpl,
+ final JobManagerConfiguration configuration2,
+ final ResourceResolverFactory resourceResolverFactory2) {
+ this.resourceResolverFactory = resourceResolverFactory2;
+ this.configuration = configuration2;
+ this.jobManager = jobManagerImpl;
+ this.active.set(true);
+ // start background thread
+ final Thread loaderThread = new Thread(this, "Apache Sling Job Background Loader");
+ loaderThread.setDaemon(true);
+ loaderThread.start();
+ }
+
+ /**
+ * Deactivate the loader.
+ */
+ public void deactivate() {
+ this.active.set(false);
+ // make sure to stop background thread
+ synchronized ( this.loadLock ) {
+ this.running = false;
+ this.loadLock.notify();
+ }
+ this.stop();
+ synchronized ( this.stopLock ) {
+ this.stopLock.notify();
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Start the background loader process.
+ */
+ public void start() {
+ synchronized ( this.loadLock ) {
+ this.changeCount++;
+ this.running = true;
+ this.loadLock.notify();
+ // make sure to clear out old information
+ this.actionQueue.clear();
+ this.unloadedJobs.clear();
+ }
+ }
+
+ private static final String END_TOKEN = "*";
+
+ /**
+ * Stop the background loader process.
+ */
+ public void stop() {
+ synchronized ( this.loadLock ) {
+ this.running = false;
+ }
+ // stop action queue
+ try {
+ this.actionQueue.put(END_TOKEN);
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+
+ /**
+ * Restart if the instance is currently running.
+ */
+ public void restart() {
+ if ( this.isRunning() ) {
+ this.stop();
+ this.start();
+ }
+ }
+
+ @Override
+ public void run() {
+ while ( this.active.get() ) {
+ final long startTime;
+ // we have to wait to get started
+ synchronized ( this.loadLock ) {
+ while ( this.active.get() && !this.running ) {
+ try {
+ this.loadLock.wait();
+ } catch (final InterruptedException e) {
+ // ignore
+ }
+ }
+ startTime = System.currentTimeMillis();
+ }
+
+ // give the system some time to start
+ if ( this.isRunning() ) {
+ synchronized ( this.stopLock ) {
+ try {
+ this.stopLock.wait(1000 * this.configuration.getBackgroundLoadDelay());
+ } catch (final InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ // load jobs from the resource tree
+ if ( this.isRunning() ) {
+ this.loadJobsInTheBackground(startTime);
+ }
+
+ // and finally process the action queue
+ while ( this.isRunning() ) {
+ String path = null;
+ try {
+ path = this.actionQueue.take();
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ if ( path != null && !END_TOKEN.equals(path) && this.isRunning() ) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource resource = resolver.getResource(path);
+ if ( ResourceHelper.RESOURCE_TYPE_JOB.equals(resource.getResourceType()) ) {
+ this.logger.debug("Reading local job from {}", path);
+ final JobImpl job = this.jobManager.readJob(resource);
+ if ( job != null ) {
+ if ( job.hasReadErrors() ) {
+ synchronized ( this.unloadedJobs ) {
+ this.unloadedJobs.add(path);
+ }
+ } else {
+ this.jobManager.process(job);
+ }
+ }
+ }
+ } catch ( final LoginException le ) {
+ // administrative login should always work
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Load all active jobs from the resource tree.
+ */
+ private void loadJobsInTheBackground(final long startTime) {
+ logger.debug("Starting background loading...");
+
+ ResourceResolver resolver = null;
+ long count = 0;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Calendar startDate = Calendar.getInstance();
+ startDate.setTimeInMillis(startTime);
+
+ final StringBuilder buf = new StringBuilder(64);
+
+ buf.append("//element(*,");
+ buf.append(ResourceHelper.RESOURCE_TYPE_JOB);
+ buf.append(")[@");
+ buf.append(ISO9075.encode(Job.PROPERTY_JOB_TARGET_INSTANCE));
+ buf.append(" = '");
+ buf.append(Environment.APPLICATION_ID);
+ buf.append("' and @");
+ buf.append(ISO9075.encode(Job.PROPERTY_JOB_CREATED));
+ buf.append(" < xs:dateTime('");
+ buf.append(ISO8601.format(startDate));
+ buf.append("')");
+ buf.append("] order by @");
+ buf.append(ISO9075.encode(Job.PROPERTY_JOB_CREATED));
+ buf.append(" ascending");
+ if ( this.isRunning() ) {
+ final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
+
+ while ( this.isRunning() && result.hasNext() ) {
+ final Resource jobResource = result.next();
+ if ( this.loadJobInTheBackground(jobResource) ) {
+ count++;
+ }
+ }
+ }
+ } catch (final QuerySyntaxException qse) {
+ this.ignoreException(qse);
+ } catch (final LoginException le) {
+ this.ignoreException(le);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+
+ logger.debug("Finished background loading of {} jobs.", count);
+ }
+
+ /**
+ * Load a single job from the resource tree.
+ */
+ private boolean loadJobInTheBackground(final Resource jobResource) {
+ // sanity check for the path
+ if ( this.configuration.isLocalJob(jobResource.getPath()) ) {
+ final JobImpl job = this.jobManager.readJob(jobResource);
+ if ( job != null ) {
+ // check if the job is currently running
+ if ( this.changeCount == 1 || job.getProcessingStarted() == null ) {
+ // reset started time
+ if ( job.getProcessingStarted() != null && this.isRunning() ) {
+ job.getProperties().remove(Job.PROPERTY_JOB_STARTED_TIME);
+ // make sure to clear the started time
+ try {
+ final ModifiableValueMap mvm = jobResource.adaptTo(ModifiableValueMap.class);
+ mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
+ jobResource.getResourceResolver().commit();
+ } catch ( final PersistenceException ignore) {
+ this.ignoreException(ignore);
+ }
+ }
+
+ if ( job.hasReadErrors() ) {
+ synchronized ( this.unloadedJobs ) {
+ this.unloadedJobs.add(job.getResourcePath());
+ }
+ } else {
+ if ( this.isRunning() ) {
+ this.jobManager.process(job);
+ }
+ }
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean isRunning() {
+ return this.active.get() && this.running;
+ }
+
+ /**
+ * Try to reload unloaded jobs - this method is invoked if bundles have been added etc.
+ */
+ public void tryToReloadUnloadedJobs() {
+ // bundle event started or updated
+ final Set<String> copyUnloadedJobs = new HashSet<String>();
+ synchronized ( this.unloadedJobs ) {
+ copyUnloadedJobs.addAll(this.unloadedJobs);
+ this.unloadedJobs.clear();
+ }
+ if ( copyUnloadedJobs.size() > 0 ) {
+ final Runnable t = new Runnable() {
+
+ @Override
+ public void run() {
+ synchronized ( loadLock ) {
+ if ( isRunning() ) {
+ final Iterator<String> iter = copyUnloadedJobs.iterator();
+ while ( iter.hasNext() ) {
+ try {
+ actionQueue.put(iter.next());
+ } catch (InterruptedException e) {
+ ignoreException(e);
+ }
+ }
+ }
+ }
+ }
+
+ };
+ Environment.THREAD_POOL.execute(t);
+ }
+ }
+
+ public void loadJob(final String path) {
+ try {
+ this.actionQueue.put(path);
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
------------------------------------------------------------------------------
svn:keywords = author date id revision rev url
Propchange: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/BackgroundLoader.java
------------------------------------------------------------------------------
svn:mime-type = text/plain