You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/10/18 23:19:26 UTC
[sling-org-apache-sling-event-dea] 01/29: SLING-3882 : Move the
distributed event admin into a separate project
This is an automated email from the ASF dual-hosted git repository.
rombert pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event-dea.git
commit 10facb4225ca07cbe08bce163171ca2f38da640d
Author: Carsten Ziegeler <cz...@apache.org>
AuthorDate: Tue Aug 26 08:24:57 2014 +0000
SLING-3882 : Move the distributed event admin into a separate project
git-svn-id: https://svn.apache.org/repos/asf/sling/trunk@1620527 13f79535-47bb-0310-9956-ffa450edef68
---
pom.xml | 148 +++++++
.../org/apache/sling/event/dea/DEAConstants.java | 47 +++
.../event/dea/impl/DistributedEventAdminImpl.java | 102 +++++
.../event/dea/impl/DistributedEventReceiver.java | 438 +++++++++++++++++++++
.../event/dea/impl/DistributedEventSender.java | 243 ++++++++++++
.../sling/event/dea/impl/ResourceHelper.java | 96 +++++
.../org/apache/sling/event/dea/package-info.java | 24 ++
.../dea/impl/DistributingEventHandlerTest.java | 182 +++++++++
8 files changed, 1280 insertions(+)
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..afeacbe
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>sling</artifactId>
+ <version>19</version>
+ <relativePath>../../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>org.apache.sling.event.dea</artifactId>
+ <packaging>bundle</packaging>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <name>Apache Sling Distributed Event Admin</name>
+ <description>
+ Support distributing events through the OSGi event admin.
+ </description>
+
+ <scm>
+ <connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/dea</connection>
+ <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/dea</developerConnection>
+ <url>http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/dea</url>
+ </scm>
+
+ <properties>
+ <sling.java.version>6</sling.java.version>
+ </properties>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ </plugin>
+ </plugins>
+ </build>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <excludePackageNames>
+ org.apache.sling.event.dea.impl
+ </excludePackageNames>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.api</artifactId>
+ <version>1.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <version>5.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ <version>5.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.settings</artifactId>
+ <version>1.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.osgi</artifactId>
+ <version>2.2.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.api</artifactId>
+ <version>2.7.1-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit-addons</groupId>
+ <artifactId>junit-addons</artifactId>
+ <version>1.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.testing.resourceresolver-mock</artifactId>
+ <version>0.2.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
diff --git a/src/main/java/org/apache/sling/event/dea/DEAConstants.java b/src/main/java/org/apache/sling/event/dea/DEAConstants.java
new file mode 100644
index 0000000..6ea8718
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/DEAConstants.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea;
+
+
+/**
+ * The <code>DEAConstants</code> provides some constants for
+ * handling distributed OSGi events.
+ * <p>
+ * If an event should be sent to other instances, the event
+ * property {@link #PROPERTY_DISTRIBUTE} should be set to
+ * an empty string.
+ * <p>
+ * An event, regardless if distributed or not, should never be
+ * created with the property {@link #PROPERTY_APPLICATION}. In
+ * addition properties starting with "event.dea." are reserved
+ * attributes of this implementation and must not be used
+ * by custom events.
+ * <p>
+ * If the event is a local event, the {@link #PROPERTY_APPLICATION}
+ * is not available. If it is available, it contains the application
+ * (Sling ID) of the instance where the event originated.
+ */
+public abstract class DEAConstants {
+
+ /** This event property indicates, if the event should be distributed in the cluster. */
+ public static final String PROPERTY_DISTRIBUTE = "event.distribute";
+
+ /** This event property specifies the application node. */
+ public static final String PROPERTY_APPLICATION = "event.application";
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/event/dea/impl/DistributedEventAdminImpl.java b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventAdminImpl.java
new file mode 100644
index 0000000..df128b4
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventAdminImpl.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * This service wraps the configuration of the distributed event admin
+ * and starts the different parts.
+ */
+@Component(name="org.apache.sling.event.impl.DistributingEventHandler")
+public class DistributedEventAdminImpl {
+
+ public static final String RESOURCE_TYPE_FOLDER = "sling:Folder";
+
+ public static final String RESOURCE_TYPE_EVENT = "sling/distributed/event";
+
+ @Reference
+ private SlingSettingsService settings;
+
+ @Reference
+ private ResourceResolverFactory resourceResolverFactory;
+
+ @Reference
+ private EventAdmin eventAdmin;
+
+ /** Default repository path. */
+ public static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/distribution";
+
+ /** The path where all jobs are stored. */
+ @Property(value=DEFAULT_REPOSITORY_PATH)
+ private static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
+
+ /** Default clean up time is 15 minutes. */
+ private static final int DEFAULT_CLEANUP_PERIOD = 15;
+
+ @Property(intValue=DEFAULT_CLEANUP_PERIOD)
+ private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+ /** The local receiver of distributed events .*/
+ private DistributedEventReceiver receiver;
+
+ /** The local sender for distributed events. */
+ private DistributedEventSender sender;
+
+ @Activate
+ protected void activate(final BundleContext bundleContext, final Map<String, Object> props) {
+ final int cleanupPeriod = PropertiesUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+ final String rootPath = PropertiesUtil.toString(props.get(
+ CONFIG_PROPERTY_REPOSITORY_PATH), DEFAULT_REPOSITORY_PATH);
+ final String ownRootPath = rootPath.concat("/").concat(settings.getSlingId());
+
+ this.receiver = new DistributedEventReceiver(bundleContext,
+ rootPath,
+ ownRootPath,
+ cleanupPeriod,
+ this.resourceResolverFactory, this.settings);
+ this.sender = new DistributedEventSender(bundleContext,
+ rootPath,
+ ownRootPath,
+ this.resourceResolverFactory, this.eventAdmin);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ if ( this.receiver != null ) {
+ this.receiver.stop();
+ this.receiver = null;
+ }
+ if ( this.sender != null ) {
+ this.sender.stop();
+ this.sender = null;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/event/dea/impl/DistributedEventReceiver.java b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventReceiver.java
new file mode 100644
index 0000000..58ed940
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventReceiver.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.event.dea.DEAConstants;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the distributed event receiver.
+ * It listens for all distributable events and stores them in the
+ * repository for other cluster instance to pick them up.
+ * <p>
+ * This component is scheduled to run some clean up tasks in the
+ * background periodically.
+ * <p>
+ */
+public class DistributedEventReceiver
+ implements EventHandler, Runnable, TopologyEventListener {
+
+ /** Special topic to stop the queue. */
+ private static final String TOPIC_STOPPED = "org/apache/sling/event/dea/impl/STOPPED";
+
+ /** Logger */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** A local queue for writing received events into the repository. */
+ private final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
+
+ /** The resource resolver factory. */
+ private final ResourceResolverFactory resourceResolverFactory;
+
+ /** The current instance id. */
+ private final String slingId;
+
+ /** The root path for events . */
+ private final String rootPath;
+
+ /** The root path for events written by this instance. */
+ private final String ownRootPath;
+
+ /** The cleanup period. */
+ private final int cleanupPeriod;
+
+ /** Resolver used for writing. */
+ private volatile ResourceResolver writerResolver;
+
+ /** Is the background task still running? */
+ private volatile boolean running;
+
+ /** The current instances if this is the leader. */
+ private volatile Set<String> instances;
+
+ /** The service registration. */
+ private volatile ServiceRegistration<?> serviceRegistration;
+
+ public DistributedEventReceiver(final BundleContext bundleContext,
+ final String rootPath,
+ final String ownRootPath,
+ final int cleanupPeriod,
+ final ResourceResolverFactory rrFactory,
+ final SlingSettingsService settings) {
+ this.rootPath = rootPath;
+ this.ownRootPath = ownRootPath;
+ this.resourceResolverFactory = rrFactory;
+ this.slingId = settings.getSlingId();
+ this.cleanupPeriod = cleanupPeriod;
+
+ this.running = true;
+ // start writer thread
+ final Thread writerThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // create service registration properties
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
+
+ // listen for all OSGi events with the distributable flag
+ props.put(EventConstants.EVENT_TOPIC, "*");
+ props.put(EventConstants.EVENT_FILTER, "(" + DEAConstants.PROPERTY_DISTRIBUTE + "=*)");
+ // schedule this service every 30 minutes
+ props.put("scheduler.period", 1800L);
+ props.put("scheduler.concurrent", Boolean.FALSE);
+
+ final ServiceRegistration<?> reg =
+ bundleContext.registerService(new String[] {EventHandler.class.getName(),
+ Runnable.class.getName(),
+ TopologyEventListener.class.getName()},
+ DistributedEventReceiver.this, props);
+
+ DistributedEventReceiver.this.serviceRegistration = reg;
+
+ try {
+ writerResolver = resourceResolverFactory.getAdministrativeResourceResolver(null);
+ ResourceUtil.getOrCreateResource(writerResolver,
+ ownRootPath,
+ DistributedEventAdminImpl.RESOURCE_TYPE_FOLDER,
+ DistributedEventAdminImpl.RESOURCE_TYPE_FOLDER,
+ true);
+ } catch (final Exception e) {
+ // there is nothing we can do except log!
+ logger.error("Error during resource resolver creation.", e);
+ running = false;
+ }
+ try {
+ processWriteQueue();
+ } catch (final Throwable t) { //NOSONAR
+ logger.error("Writer thread stopped with exception: " + t.getMessage(), t);
+ running = false;
+ }
+ if ( writerResolver != null ) {
+ writerResolver.close();
+ writerResolver = null;
+ }
+ }
+ });
+ writerThread.start();
+ }
+
+ /**
+ * Deactivate this component.
+ */
+ public void stop() {
+ if ( this.serviceRegistration != null ) {
+ this.serviceRegistration.unregister();
+ this.serviceRegistration = null;
+ }
+ // stop background threads by putting empty objects into the queue
+ this.running = false;
+ try {
+ this.writeQueue.put(new Event(TOPIC_STOPPED, (Dictionary<String, Object>)null));
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Background thread writing events into the queue.
+ */
+ private void processWriteQueue() {
+ while ( this.running ) {
+ // so let's wait/get the next event from the queue
+ Event event = null;
+ try {
+ event = this.writeQueue.take();
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ Thread.currentThread().interrupt();
+ this.running = false;
+ }
+ if ( event != null && this.running ) {
+ try {
+ this.writeEvent(event);
+ } catch (final Exception e) {
+ this.logger.error("Exception during writing the event to the resource tree.", e);
+ }
+ }
+ }
+ }
+
+ /** Counter for events. */
+ private final AtomicLong eventCounter = new AtomicLong(0);
+
+ /**
+ * Write an event to the resource tree.
+ * @param event The event
+ * @throws PersistenceException
+ */
+ private void writeEvent(final Event event)
+ throws PersistenceException {
+ final Calendar now = Calendar.getInstance();
+
+ final StringBuilder sb = new StringBuilder(this.ownRootPath);
+ sb.append('/');
+ sb.append(now.get(Calendar.YEAR));
+ sb.append('/');
+ sb.append(now.get(Calendar.MONTH) + 1);
+ sb.append('/');
+ sb.append(now.get(Calendar.DAY_OF_MONTH));
+ sb.append('/');
+ sb.append(now.get(Calendar.HOUR_OF_DAY));
+ sb.append('/');
+ sb.append(now.get(Calendar.MINUTE));
+ sb.append('/');
+ sb.append("event-");
+ sb.append(String.valueOf(eventCounter.getAndIncrement()));
+
+ // create properties
+ final Map<String, Object> properties = new HashMap<String, Object>();
+
+ final String[] propNames = event.getPropertyNames();
+ if ( propNames != null && propNames.length > 0 ) {
+ for(final String propName : propNames) {
+ properties.put(propName, event.getProperty(propName));
+ }
+ }
+
+ properties.remove(DEAConstants.PROPERTY_DISTRIBUTE);
+ properties.put(EventConstants.EVENT_TOPIC, event.getTopic());
+ properties.put(DEAConstants.PROPERTY_APPLICATION, this.slingId);
+ final Object oldRT = properties.get(ResourceResolver.PROPERTY_RESOURCE_TYPE);
+ if ( oldRT != null ) {
+ properties.put("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE, oldRT);
+ }
+ properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, DistributedEventAdminImpl.RESOURCE_TYPE_EVENT);
+ ResourceUtil.getOrCreateResource(this.writerResolver,
+ sb.toString(),
+ properties,
+ DistributedEventAdminImpl.RESOURCE_TYPE_FOLDER,
+ true);
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ @Override
+ public void handleEvent(final Event event) {
+ try {
+ this.writeQueue.put(event);
+ } catch (final InterruptedException ex) {
+ this.ignoreException(ex);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * This method is invoked periodically.
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ this.cleanUpObsoleteInstances();
+ this.cleanUpObsoleteEvents();
+ }
+
+ private void cleanUpObsoleteInstances() {
+ final Set<String> slingIds = this.instances;
+ if ( slingIds != null ) {
+ this.instances = null;
+ this.logger.debug("Checking for old instance trees for distributed events.");
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+ final Resource baseResource = resolver.getResource(this.rootPath);
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final ResourceUtil.BatchResourceRemover brr = ResourceUtil.getBatchResourceResourceRemover(50);
+ final Iterator<Resource> iter = baseResource.listChildren();
+ while ( iter.hasNext() ) {
+ final Resource rootResource = iter.next();
+ if ( !slingIds.contains(rootResource.getName()) ) {
+ brr.delete(rootResource);
+ }
+ }
+ // final commit for outstanding deletes
+ resolver.commit();
+ }
+
+ } catch (final PersistenceException pe) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during job resource tree cleanup.", pe);
+ } catch (final LoginException ignore) {
+ this.ignoreException(ignore);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+
+ private void cleanUpObsoleteEvents() {
+ if ( this.cleanupPeriod > 0 ) {
+ this.logger.debug("Cleaning up distributed events, removing all entries older than {} minutes.", this.cleanupPeriod);
+
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final ResourceUtil.BatchResourceRemover brr = ResourceUtil.getBatchResourceResourceRemover(50);
+
+ final Resource baseResource = resolver.getResource(this.ownRootPath);
+ // sanity check - should never be null
+ if ( baseResource != null ) {
+ final Calendar oldDate = Calendar.getInstance();
+ oldDate.add(Calendar.MINUTE, -1 * this.cleanupPeriod);
+
+ // check years
+ final int oldYear = oldDate.get(Calendar.YEAR);
+ final Iterator<Resource> yearIter = baseResource.listChildren();
+ while ( yearIter.hasNext() ) {
+ final Resource yearResource = yearIter.next();
+ final int year = Integer.valueOf(yearResource.getName());
+ if ( year < oldYear ) {
+ brr.delete(yearResource);
+ } else if ( year == oldYear ) {
+
+ // same year - check months
+ final int oldMonth = oldDate.get(Calendar.MONTH) + 1;
+ final Iterator<Resource> monthIter = yearResource.listChildren();
+ while ( monthIter.hasNext() ) {
+ final Resource monthResource = monthIter.next();
+ final int month = Integer.valueOf(monthResource.getName());
+ if ( month < oldMonth ) {
+ brr.delete(monthResource);
+ } else if ( month == oldMonth ) {
+
+ // same month - check days
+ final int oldDay = oldDate.get(Calendar.DAY_OF_MONTH);
+ final Iterator<Resource> dayIter = monthResource.listChildren();
+ while ( dayIter.hasNext() ) {
+ final Resource dayResource = dayIter.next();
+ final int day = Integer.valueOf(dayResource.getName());
+ if ( day < oldDay ) {
+ brr.delete(dayResource);
+ } else if ( day == oldDay ) {
+
+ // same day - check hours
+ final int oldHour = oldDate.get(Calendar.HOUR_OF_DAY);
+ final Iterator<Resource> hourIter = dayResource.listChildren();
+ while ( hourIter.hasNext() ) {
+ final Resource hourResource = hourIter.next();
+ final int hour = Integer.valueOf(hourResource.getName());
+ if ( hour < oldHour ) {
+ brr.delete(hourResource);
+ } else if ( hour == oldHour ) {
+
+ // same hour - check minutes
+ final int oldMinute = oldDate.get(Calendar.MINUTE);
+ final Iterator<Resource> minuteIter = hourResource.listChildren();
+ while ( minuteIter.hasNext() ) {
+ final Resource minuteResource = minuteIter.next();
+
+ final int minute = Integer.valueOf(minuteResource.getName());
+ if ( minute < oldMinute ) {
+ brr.delete(minuteResource);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ // final commit for outstanding resources
+ resolver.commit();
+
+ } catch (final PersistenceException pe) {
+ // in the case of an error, we just log this as a warning
+ this.logger.warn("Exception during job resource tree cleanup.", pe);
+ } catch (final LoginException ignore) {
+ this.ignoreException(ignore);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
+ */
+ @Override
+ public void handleTopologyEvent(final TopologyEvent event) {
+ if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+ this.instances = null;
+ } else if ( event.getType() == Type.TOPOLOGY_CHANGED || event.getType() == Type.TOPOLOGY_INIT ) {
+ if ( event.getNewView().getLocalInstance().isLeader() ) {
+ final Set<String> set = new HashSet<String>();
+ for(final InstanceDescription desc : event.getNewView().getInstances() ) {
+ set.add(desc.getSlingId());
+ }
+ this.instances = set;
+ }
+ }
+ }
+}
+
diff --git a/src/main/java/org/apache/sling/event/dea/impl/DistributedEventSender.java b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventSender.java
new file mode 100644
index 0000000..95aa021
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventSender.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.dea.DEAConstants;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This event handler distributes events from other application nodes in
+ * the cluster on the current instance.
+ * <p>
+ * It's listening for resource added events in the resource tree storing the
+ * distributed events. If a new resource is added, the resource is read,
+ * converted to an event and then send using the local event admin.
+ * <p>
+ */
+public class DistributedEventSender
+ implements EventHandler {
+
+ /** Default logger. */
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** Is the background task still running? */
+ private volatile boolean running;
+
+ /** A local queue for serializing the event processing. */
+ private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+
+ private final ResourceResolverFactory resourceResolverFactory;
+
+ private final EventAdmin eventAdmin;
+
+ private final String ownRootPathWithSlash;
+
+ private volatile ServiceRegistration<?> serviceRegistration;
+
+ public DistributedEventSender(final BundleContext bundleContext,
+ final String rootPath,
+ final String ownRootPath,
+ final ResourceResolverFactory rrFactory,
+ final EventAdmin eventAdmin) {
+ this.eventAdmin = eventAdmin;
+ this.resourceResolverFactory = rrFactory;
+ this.ownRootPathWithSlash = ownRootPath + "/";
+
+ this.running = true;
+ final Thread backgroundThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // create service registration properties
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
+
+ // listen for all resource added OSGi events in the DEA area
+ props.put(EventConstants.EVENT_TOPIC, SlingConstants.TOPIC_RESOURCE_ADDED);
+ props.put(EventConstants.EVENT_FILTER, "(path=" + rootPath + "/*)");
+
+ final ServiceRegistration<?> reg =
+ bundleContext.registerService(new String[] {EventHandler.class.getName()},
+ DistributedEventSender.this, props);
+
+ DistributedEventSender.this.serviceRegistration = reg;
+
+ try {
+ runInBackground();
+ } catch (Throwable t) { //NOSONAR
+ logger.error("Background thread stopped with exception: " + t.getMessage(), t);
+ running = false;
+ }
+ }
+ });
+ backgroundThread.start();
+ }
+
+ /**
+ * Deactivate this component.
+ */
+ public void stop() {
+ if ( this.serviceRegistration != null ) {
+ this.serviceRegistration.unregister();
+ this.serviceRegistration = null;
+ }
+ // stop background threads by putting empty objects into the queue
+ this.running = false;
+ try {
+ this.queue.put("");
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Read an event from the resource
+ * @return The event object or <code>null</code>
+ */
+ private Event readEvent(final Resource eventResource) {
+ try {
+ final ValueMap vm = ResourceHelper.getValueMap(eventResource);
+ final String topic = vm.get(EventConstants.EVENT_TOPIC, String.class);
+ if ( topic == null ) {
+ // no topic should never happen as we check the resource type before
+ logger.error("Unable to read distributed event from " + eventResource.getPath() + " : no topic property available.");
+ } else {
+ final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+ // only send event if there are no read errors, otherwise discard it
+ @SuppressWarnings("unchecked")
+ final List<Exception> readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
+ if ( readErrorList == null ) {
+ properties.remove(EventConstants.EVENT_TOPIC);
+ properties.remove(DEAConstants.PROPERTY_DISTRIBUTE);
+ final Object oldRT = properties.remove("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE);
+ if ( oldRT != null ) {
+ properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, oldRT);
+ } else {
+ properties.remove(ResourceResolver.PROPERTY_RESOURCE_TYPE);
+ }
+ try {
+ final Event event = new Event(topic, properties);
+ return event;
+ } catch (final IllegalArgumentException iae) {
+ // this exception occurs if the topic is not correct (it should never happen,
+ // but you never know)
+ logger.error("Unable to read event: " + iae.getMessage(), iae);
+ }
+ } else {
+ for(final Exception e : readErrorList) {
+ logger.warn("Unable to read distributed event from " + eventResource.getPath(), e);
+ }
+ }
+ }
+ } catch (final InstantiationException ie) {
+ // something happened with the resource in the meantime
+ this.ignoreException(ie);
+ }
+ return null;
+ }
+
+ /**
+ * Background thread
+ */
+ private void runInBackground() {
+ while ( this.running ) {
+ // so let's wait/get the next event from the queue
+ String path = null;
+ try {
+ path = this.queue.take();
+ } catch (final InterruptedException e) {
+ this.ignoreException(e);
+ Thread.currentThread().interrupt();
+ this.running = false;
+ }
+ if ( path != null && path.length() > 0 && this.running ) {
+ ResourceResolver resolver = null;
+ try {
+ resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+ final Resource eventResource = resolver.getResource(path);
+ if ( DistributedEventAdminImpl.RESOURCE_TYPE_EVENT.equals(eventResource.getResourceType())) {
+ final Event e = this.readEvent(eventResource);
+ if ( e != null ) {
+ // we check event admin as processing is async
+ final EventAdmin localEA = this.eventAdmin;
+ if ( localEA != null ) {
+ localEA.postEvent(e);
+ } else {
+ this.logger.error("Unable to post event as no event admin is available.");
+ }
+ }
+ }
+ } catch (final LoginException ex) {
+ this.logger.error("Exception during creation of resource resolver.", ex);
+ } finally {
+ if ( resolver != null ) {
+ resolver.close();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+ */
+ @Override
+ public void handleEvent(final Event event) {
+ final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
+ if ( !path.startsWith(this.ownRootPathWithSlash) ) {
+ try {
+ this.queue.put(path);
+ } catch (final InterruptedException ex) {
+ this.ignoreException(ex);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Helper method which just logs the exception in debug mode.
+ * @param e
+ */
+ private void ignoreException(final Exception e) {
+ if ( this.logger.isDebugEnabled() ) {
+ this.logger.debug("Ignored exception " + e.getMessage(), e);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/event/dea/impl/ResourceHelper.java b/src/main/java/org/apache/sling/event/dea/impl/ResourceHelper.java
new file mode 100644
index 0000000..0694b74
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/impl/ResourceHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ValueMap;
+
+public abstract class ResourceHelper {
+
+ public static final String PROPERTY_MARKER_READ_ERROR_LIST = ResourceHelper.class.getName() + "/ReadErrorList";
+
+ public static Map<String, Object> cloneValueMap(final ValueMap vm) throws InstantiationException {
+ List<Exception> hasReadError = null;
+ try {
+ final Map<String, Object> result = new HashMap<String, Object>(vm);
+ for(final Map.Entry<String, Object> entry : result.entrySet()) {
+ if ( entry.getValue() instanceof InputStream ) {
+ final Object value = vm.get(entry.getKey(), Serializable.class);
+ if ( value != null ) {
+ entry.setValue(value);
+ } else {
+ if ( hasReadError == null ) {
+ hasReadError = new ArrayList<Exception>();
+ }
+ final int count = hasReadError.size();
+ // let's find out which class might be missing
+ ObjectInputStream ois = null;
+ try {
+ ois = new ObjectInputStream((InputStream)entry.getValue());
+ ois.readObject();
+ } catch (final ClassNotFoundException cnfe) {
+ hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'", cnfe));
+ } catch (final IOException ioe) {
+ hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'", ioe));
+ } finally {
+ if ( ois != null ) {
+ try {
+ ois.close();
+ } catch (IOException ignore) {
+ // ignore
+ }
+ }
+ }
+ if ( hasReadError.size() == count ) {
+ hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'"));
+ }
+ }
+ }
+ }
+ if ( hasReadError != null ) {
+ result.put(PROPERTY_MARKER_READ_ERROR_LIST, hasReadError);
+ }
+ return result;
+ } catch ( final IllegalArgumentException iae) {
+ // the JCR implementation might throw an IAE if something goes wrong
+ throw (InstantiationException)new InstantiationException(iae.getMessage()).initCause(iae);
+ }
+ }
+
+ public static ValueMap getValueMap(final Resource resource) throws InstantiationException {
+ final ValueMap vm = resource.getValueMap();
+ // trigger full loading
+ try {
+ vm.size();
+ } catch ( final IllegalArgumentException iae) {
+ // the JCR implementation might throw an IAE if something goes wrong
+ throw (InstantiationException)new InstantiationException(iae.getMessage()).initCause(iae);
+ }
+ return vm;
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/event/dea/package-info.java b/src/main/java/org/apache/sling/event/dea/package-info.java
new file mode 100644
index 0000000..9495421
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@Version("1.0.0")
+package org.apache.sling.event.dea;
+
+import aQute.bnd.annotation.Version;
+
diff --git a/src/test/java/org/apache/sling/event/dea/impl/DistributingEventHandlerTest.java b/src/test/java/org/apache/sling/event/dea/impl/DistributingEventHandlerTest.java
new file mode 100644
index 0000000..3547300
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/dea/impl/DistributingEventHandlerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.event.dea.DEAConstants;
+import org.apache.sling.settings.SlingSettingsService;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactoryOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.Mockito;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+public class DistributingEventHandlerTest {
+
+ private static final String TOPIC_PREFIX = "write/";
+
+ private DistributedEventReceiver receiver;
+
+ private DistributedEventSender sender;
+
+ private static final String MY_APP_ID = "1234";
+
+ private static final String OTHER_APP_ID = "5678";
+
+ private final List<Event> events = Collections.synchronizedList(new ArrayList<Event>());
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setup() throws Exception {
+ final BundleContext bc = Mockito.mock(BundleContext.class);
+ Mockito.when(bc.registerService(Mockito.any(String[].class),
+ Mockito.any(),
+ Mockito.any(Dictionary.class))).thenReturn(null);
+
+ final SlingSettingsService otherSettings = Mockito.mock(SlingSettingsService.class);
+ Mockito.when(otherSettings.getSlingId()).thenReturn(OTHER_APP_ID);
+
+ final EventAdmin ea = new EventAdmin() {
+
+ @Override
+ public void sendEvent(final Event event) {
+ this.postEvent(event);
+ }
+
+ @Override
+ public void postEvent(final Event event) {
+ final String topic = event.getTopic();
+ if ( topic.equals(SlingConstants.TOPIC_RESOURCE_ADDED) ) {
+ sender.handleEvent(event);
+ } else if ( topic.startsWith(TOPIC_PREFIX) ) {
+ events.add(event);
+ }
+ }
+ };
+ final MockResourceResolverFactoryOptions opts = new MockResourceResolverFactoryOptions();
+ opts.setEventAdmin(ea);
+ final ResourceResolverFactory factory = new MockResourceResolverFactory(opts);
+
+ this.sender = new DistributedEventSender(bc, DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH,
+ DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH + "/" + MY_APP_ID, factory, ea);
+
+ this.receiver = new DistributedEventReceiver(bc, DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH,
+ DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH + "/" + OTHER_APP_ID, 15, factory, otherSettings);
+ }
+
+ @After
+ public void cleanup() {
+ if ( this.sender != null ) {
+ this.sender.stop();
+ this.sender = null;
+ }
+ if ( this.receiver != null ) {
+ this.receiver.stop();
+ this.receiver = null;
+ }
+ }
+
+ @org.junit.Test(timeout=5000) public void testSendEvent() throws Exception {
+ this.events.clear();
+
+ final String VALUE = "some value";
+ final String topic = TOPIC_PREFIX + "event/test";
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("a property", VALUE);
+ final Event e = new Event(topic, props);
+ this.receiver.handleEvent(e);
+
+ while ( this.events.size() == 0 ) {
+ Thread.sleep(5);
+ }
+ final Event receivedEvent = this.events.get(0);
+
+ assertEquals(topic, receivedEvent.getTopic());
+ assertEquals(OTHER_APP_ID, receivedEvent.getProperty(DEAConstants.PROPERTY_APPLICATION));
+ assertEquals(VALUE, receivedEvent.getProperty("a property"));
+ assertNull(receivedEvent.getProperty(ResourceResolver.PROPERTY_RESOURCE_TYPE));
+
+ this.events.clear();
+ }
+
+ @org.junit.Test(timeout=5000) public void testSendEventPlusAppId() throws Exception {
+ this.events.clear();
+
+ final String VALUE = "some value";
+ final String topic = TOPIC_PREFIX + "event/test";
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put("a property", "some value");
+ // now we check if the application id is handled correctly
+ props.put(DEAConstants.PROPERTY_APPLICATION, "foo");
+
+ final Event e = new Event(topic, props);
+ this.receiver.handleEvent(e);
+
+ while ( this.events.size() == 0 ) {
+ Thread.sleep(5);
+ }
+ final Event receivedEvent = this.events.get(0);
+
+ assertEquals(topic, receivedEvent.getTopic());
+ assertEquals(OTHER_APP_ID, receivedEvent.getProperty(DEAConstants.PROPERTY_APPLICATION));
+ assertEquals(VALUE, receivedEvent.getProperty("a property"));
+ assertNull(receivedEvent.getProperty(ResourceResolver.PROPERTY_RESOURCE_TYPE));
+
+ this.events.clear();
+ }
+
+ @org.junit.Test(timeout=5000) public void testSendEventWithResourceType() throws Exception {
+ this.events.clear();
+
+ final String topic = TOPIC_PREFIX + "event/test";
+ final String RT = "my:resourceType";
+
+ final Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, RT);
+
+ final Event e = new Event(topic, props);
+ this.receiver.handleEvent(e);
+
+ while ( this.events.size() == 0 ) {
+ Thread.sleep(5);
+ }
+ final Event receivedEvent = this.events.get(0);
+
+ assertEquals(topic, receivedEvent.getTopic());
+ assertEquals(OTHER_APP_ID, receivedEvent.getProperty(DEAConstants.PROPERTY_APPLICATION));
+ assertEquals(RT, receivedEvent.getProperty(ResourceResolver.PROPERTY_RESOURCE_TYPE));
+ assertNull(receivedEvent.getProperty("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE));
+
+ this.events.clear();
+ }
+}
--
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.