You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/11/07 09:33:26 UTC

[sling-org-apache-sling-event-dea] 01/06: SLING-3882 : Move the distributed event admin into a separate project

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.event.dea-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event-dea.git

commit d715c2e84394f60948f81e8db14d483f519048e2
Author: Carsten Ziegeler <cz...@apache.org>
AuthorDate: Tue Aug 26 08:24:57 2014 +0000

    SLING-3882 : Move the distributed event admin into a separate project
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/dea@1620527 13f79535-47bb-0310-9956-ffa450edef68
---
 pom.xml                                            | 148 +++++++
 .../org/apache/sling/event/dea/DEAConstants.java   |  47 +++
 .../event/dea/impl/DistributedEventAdminImpl.java  | 102 +++++
 .../event/dea/impl/DistributedEventReceiver.java   | 438 +++++++++++++++++++++
 .../event/dea/impl/DistributedEventSender.java     | 243 ++++++++++++
 .../sling/event/dea/impl/ResourceHelper.java       |  96 +++++
 .../org/apache/sling/event/dea/package-info.java   |  24 ++
 .../dea/impl/DistributingEventHandlerTest.java     | 182 +++++++++
 8 files changed, 1280 insertions(+)

diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..afeacbe
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,148 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.sling</groupId>
+        <artifactId>sling</artifactId>
+        <version>19</version>
+        <relativePath>../../../parent/pom.xml</relativePath>
+    </parent>
+
+    <artifactId>org.apache.sling.event.dea</artifactId>
+    <packaging>bundle</packaging>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <name>Apache Sling Distributed Event Admin</name>
+    <description>
+        Support distributing events through the OSGi event admin.
+    </description>
+
+    <scm>
+        <connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/dea</connection>
+        <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/dea</developerConnection>
+        <url>http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/dea</url>
+    </scm>
+
+    <properties>
+        <sling.java.version>6</sling.java.version>
+    </properties>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-scr-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <extensions>true</extensions>
+            </plugin>
+        </plugins>
+    </build>
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>
+                        org.apache.sling.event.dea.impl
+                    </excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </reporting>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.scr.annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.discovery.api</artifactId>
+            <version>1.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+            <version>5.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+            <version>5.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.settings</artifactId>
+            <version>1.0.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.commons.osgi</artifactId>
+            <version>2.2.0</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.api</artifactId>
+            <version>2.7.1-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
+      <!-- Testing -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit-addons</groupId>
+            <artifactId>junit-addons</artifactId>
+            <version>1.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+        </dependency>
+         <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.testing.resourceresolver-mock</artifactId>
+            <version>0.2.0</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+</project>
diff --git a/src/main/java/org/apache/sling/event/dea/DEAConstants.java b/src/main/java/org/apache/sling/event/dea/DEAConstants.java
new file mode 100644
index 0000000..6ea8718
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/DEAConstants.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea;
+
+
+/**
+ * The <code>DEAConstants</code> provides some constants for
+ * handling distributed OSGi events.
+ * <p>
+ * If an event should be sent to other instances, the event
+ * property {@link #PROPERTY_DISTRIBUTE} should be set to
+ * an empty string.
+ * <p>
+ * An event, regardless if distributed or not, should never be
+ * created with the property {@link #PROPERTY_APPLICATION}. In
+ * addition properties starting with "event.dea." are reserved
+ * attributes of this implementation and must not be used
+ * by custom events.
+ * <p>
+ * If the event is a local event, the {@link #PROPERTY_APPLICATION}
+ * is not available. If it is available, it contains the application
+ * (Sling ID) of the instance where the event originated.
+ */
+public abstract class DEAConstants {
+
+    /** This event property indicates, if the event should be distributed in the cluster. */
+    public static final String PROPERTY_DISTRIBUTE = "event.distribute";
+
+    /** This event property specifies the application node. */
+    public static final String PROPERTY_APPLICATION = "event.application";
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/event/dea/impl/DistributedEventAdminImpl.java b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventAdminImpl.java
new file mode 100644
index 0000000..df128b4
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventAdminImpl.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * This service wraps the configuration of the distributed event admin
+ * and starts the different parts.
+ */
+@Component(name="org.apache.sling.event.impl.DistributingEventHandler")
+public class DistributedEventAdminImpl {
+
+    public static final String RESOURCE_TYPE_FOLDER = "sling:Folder";
+
+    public static final String RESOURCE_TYPE_EVENT = "sling/distributed/event";
+
+    @Reference
+    private SlingSettingsService settings;
+
+    @Reference
+    private ResourceResolverFactory resourceResolverFactory;
+
+    @Reference
+    private EventAdmin eventAdmin;
+
+    /** Default repository path. */
+    public static final String DEFAULT_REPOSITORY_PATH = "/var/eventing/distribution";
+
+    /** The path where all jobs are stored. */
+    @Property(value=DEFAULT_REPOSITORY_PATH)
+    private static final String CONFIG_PROPERTY_REPOSITORY_PATH = "repository.path";
+
+    /** Default clean up time is 15 minutes. */
+    private static final int DEFAULT_CLEANUP_PERIOD = 15;
+
+    @Property(intValue=DEFAULT_CLEANUP_PERIOD)
+    private static final String CONFIG_PROPERTY_CLEANUP_PERIOD = "cleanup.period";
+
+    /** The local receiver of distributed events .*/
+    private DistributedEventReceiver receiver;
+
+    /** The local sender for distributed events. */
+    private DistributedEventSender sender;
+
+    @Activate
+    protected void activate(final BundleContext bundleContext, final Map<String, Object> props) {
+        final int cleanupPeriod = PropertiesUtil.toInteger(props.get(CONFIG_PROPERTY_CLEANUP_PERIOD), DEFAULT_CLEANUP_PERIOD);
+        final String rootPath = PropertiesUtil.toString(props.get(
+                CONFIG_PROPERTY_REPOSITORY_PATH), DEFAULT_REPOSITORY_PATH);
+        final String ownRootPath = rootPath.concat("/").concat(settings.getSlingId());
+
+        this.receiver = new DistributedEventReceiver(bundleContext,
+                rootPath,
+                ownRootPath,
+                cleanupPeriod,
+                this.resourceResolverFactory, this.settings);
+        this.sender = new DistributedEventSender(bundleContext,
+                              rootPath,
+                              ownRootPath,
+                              this.resourceResolverFactory, this.eventAdmin);
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        if ( this.receiver != null ) {
+            this.receiver.stop();
+            this.receiver = null;
+        }
+        if ( this.sender != null ) {
+            this.sender.stop();
+            this.sender = null;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/dea/impl/DistributedEventReceiver.java b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventReceiver.java
new file mode 100644
index 0000000..58ed940
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventReceiver.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import java.util.Calendar;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.event.dea.DEAConstants;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the distributed event receiver.
+ * It listens for all distributable events and stores them in the
+ * repository for other cluster instance to pick them up.
+ * <p>
+ * This component is scheduled to run some clean up tasks in the
+ * background periodically.
+ * <p>
+ */
+public class DistributedEventReceiver
+    implements EventHandler, Runnable, TopologyEventListener {
+
+    /** Special topic to stop the queue. */
+    private static final String TOPIC_STOPPED = "org/apache/sling/event/dea/impl/STOPPED";
+
+    /** Logger */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** A local queue for writing received events into the repository. */
+    private final BlockingQueue<Event> writeQueue = new LinkedBlockingQueue<Event>();
+
+    /** The resource resolver factory. */
+    private final ResourceResolverFactory resourceResolverFactory;
+
+    /** The current instance id. */
+    private final String slingId;
+
+    /** The root path for events . */
+    private final String rootPath;
+
+    /** The root path for events written by this instance. */
+    private final String ownRootPath;
+
+    /** The cleanup period. */
+    private final int cleanupPeriod;
+
+    /** Resolver used for writing. */
+    private volatile ResourceResolver writerResolver;
+
+    /** Is the background task still running? */
+    private volatile boolean running;
+
+    /** The current instances if this is the leader. */
+    private volatile Set<String> instances;
+
+    /** The service registration. */
+    private volatile ServiceRegistration<?> serviceRegistration;
+
+    public DistributedEventReceiver(final BundleContext bundleContext,
+            final String rootPath,
+            final String ownRootPath,
+            final int cleanupPeriod,
+            final ResourceResolverFactory rrFactory,
+            final SlingSettingsService settings) {
+        this.rootPath = rootPath;
+        this.ownRootPath = ownRootPath;
+        this.resourceResolverFactory = rrFactory;
+        this.slingId = settings.getSlingId();
+        this.cleanupPeriod = cleanupPeriod;
+
+        this.running = true;
+        // start writer thread
+        final Thread writerThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                // create service registration properties
+                final Dictionary<String, Object> props = new Hashtable<String, Object>();
+                props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
+
+                // listen for all OSGi events with the distributable flag
+                props.put(EventConstants.EVENT_TOPIC, "*");
+                props.put(EventConstants.EVENT_FILTER, "(" + DEAConstants.PROPERTY_DISTRIBUTE + "=*)");
+                // schedule this service every 30 minutes
+                props.put("scheduler.period", 1800L);
+                props.put("scheduler.concurrent", Boolean.FALSE);
+
+                final ServiceRegistration<?> reg =
+                        bundleContext.registerService(new String[] {EventHandler.class.getName(),
+                                                                   Runnable.class.getName(),
+                                                                   TopologyEventListener.class.getName()},
+                                                      DistributedEventReceiver.this, props);
+
+                DistributedEventReceiver.this.serviceRegistration = reg;
+
+                try {
+                    writerResolver = resourceResolverFactory.getAdministrativeResourceResolver(null);
+                    ResourceUtil.getOrCreateResource(writerResolver,
+                            ownRootPath,
+                            DistributedEventAdminImpl.RESOURCE_TYPE_FOLDER,
+                            DistributedEventAdminImpl.RESOURCE_TYPE_FOLDER,
+                            true);
+                } catch (final Exception e) {
+                    // there is nothing we can do except log!
+                    logger.error("Error during resource resolver creation.", e);
+                    running = false;
+                }
+                try {
+                    processWriteQueue();
+                } catch (final Throwable t) { //NOSONAR
+                    logger.error("Writer thread stopped with exception: " + t.getMessage(), t);
+                    running = false;
+                }
+                if ( writerResolver != null ) {
+                    writerResolver.close();
+                    writerResolver = null;
+                }
+            }
+        });
+        writerThread.start();
+    }
+
+    /**
+     * Deactivate this component.
+     */
+    public void stop() {
+        if ( this.serviceRegistration != null ) {
+            this.serviceRegistration.unregister();
+            this.serviceRegistration = null;
+        }
+        // stop background threads by putting empty objects into the queue
+        this.running = false;
+        try {
+            this.writeQueue.put(new Event(TOPIC_STOPPED, (Dictionary<String, Object>)null));
+        } catch (final InterruptedException e) {
+            this.ignoreException(e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Background thread writing events into the queue.
+     */
+    private void processWriteQueue() {
+        while ( this.running ) {
+            // so let's wait/get the next event from the queue
+            Event event = null;
+            try {
+                event = this.writeQueue.take();
+            } catch (final InterruptedException e) {
+                this.ignoreException(e);
+                Thread.currentThread().interrupt();
+                this.running = false;
+            }
+            if ( event != null && this.running ) {
+                try {
+                    this.writeEvent(event);
+                } catch (final Exception e) {
+                    this.logger.error("Exception during writing the event to the resource tree.", e);
+                }
+            }
+        }
+    }
+
+    /** Counter for events. */
+    private final AtomicLong eventCounter = new AtomicLong(0);
+
+    /**
+     * Write an event to the resource tree.
+     * @param event The event
+     * @throws PersistenceException
+     */
+    private void writeEvent(final Event event)
+    throws PersistenceException {
+        final Calendar now = Calendar.getInstance();
+
+        final StringBuilder sb = new StringBuilder(this.ownRootPath);
+        sb.append('/');
+        sb.append(now.get(Calendar.YEAR));
+        sb.append('/');
+        sb.append(now.get(Calendar.MONTH) + 1);
+        sb.append('/');
+        sb.append(now.get(Calendar.DAY_OF_MONTH));
+        sb.append('/');
+        sb.append(now.get(Calendar.HOUR_OF_DAY));
+        sb.append('/');
+        sb.append(now.get(Calendar.MINUTE));
+        sb.append('/');
+        sb.append("event-");
+        sb.append(String.valueOf(eventCounter.getAndIncrement()));
+
+        // create properties
+        final Map<String, Object> properties = new HashMap<String, Object>();
+
+        final String[] propNames = event.getPropertyNames();
+        if ( propNames != null && propNames.length > 0 ) {
+            for(final String propName : propNames) {
+                properties.put(propName, event.getProperty(propName));
+            }
+        }
+
+        properties.remove(DEAConstants.PROPERTY_DISTRIBUTE);
+        properties.put(EventConstants.EVENT_TOPIC, event.getTopic());
+        properties.put(DEAConstants.PROPERTY_APPLICATION, this.slingId);
+        final Object oldRT = properties.get(ResourceResolver.PROPERTY_RESOURCE_TYPE);
+        if ( oldRT != null ) {
+            properties.put("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE, oldRT);
+        }
+        properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, DistributedEventAdminImpl.RESOURCE_TYPE_EVENT);
+        ResourceUtil.getOrCreateResource(this.writerResolver,
+                sb.toString(),
+                properties,
+                DistributedEventAdminImpl.RESOURCE_TYPE_FOLDER,
+                true);
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    @Override
+    public void handleEvent(final Event event) {
+        try {
+            this.writeQueue.put(event);
+        } catch (final InterruptedException ex) {
+            this.ignoreException(ex);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+
+    /**
+     * This method is invoked periodically.
+     * @see java.lang.Runnable#run()
+     */
+    @Override
+    public void run() {
+        this.cleanUpObsoleteInstances();
+        this.cleanUpObsoleteEvents();
+    }
+
+    private void cleanUpObsoleteInstances() {
+        final Set<String> slingIds = this.instances;
+        if ( slingIds != null ) {
+            this.instances = null;
+            this.logger.debug("Checking for old instance trees for distributed events.");
+            ResourceResolver resolver = null;
+            try {
+                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+
+                final Resource baseResource = resolver.getResource(this.rootPath);
+                // sanity check - should never be null
+                if ( baseResource != null ) {
+                    final ResourceUtil.BatchResourceRemover brr = ResourceUtil.getBatchResourceResourceRemover(50);
+                    final Iterator<Resource> iter = baseResource.listChildren();
+                    while ( iter.hasNext() ) {
+                        final Resource rootResource = iter.next();
+                        if ( !slingIds.contains(rootResource.getName()) ) {
+                            brr.delete(rootResource);
+                        }
+                    }
+                    // final commit for outstanding deletes
+                    resolver.commit();
+                }
+
+            } catch (final PersistenceException pe) {
+                // in the case of an error, we just log this as a warning
+                this.logger.warn("Exception during job resource tree cleanup.", pe);
+            } catch (final LoginException ignore) {
+                this.ignoreException(ignore);
+            } finally {
+                if ( resolver != null ) {
+                    resolver.close();
+                }
+            }
+        }
+    }
+
+    private void cleanUpObsoleteEvents() {
+        if ( this.cleanupPeriod > 0 ) {
+            this.logger.debug("Cleaning up distributed events, removing all entries older than {} minutes.", this.cleanupPeriod);
+
+            ResourceResolver resolver = null;
+            try {
+                resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                final ResourceUtil.BatchResourceRemover brr = ResourceUtil.getBatchResourceResourceRemover(50);
+
+                final Resource baseResource = resolver.getResource(this.ownRootPath);
+                // sanity check - should never be null
+                if ( baseResource != null ) {
+                    final Calendar oldDate = Calendar.getInstance();
+                    oldDate.add(Calendar.MINUTE, -1 * this.cleanupPeriod);
+
+                    // check years
+                    final int oldYear = oldDate.get(Calendar.YEAR);
+                    final Iterator<Resource> yearIter = baseResource.listChildren();
+                    while ( yearIter.hasNext() ) {
+                        final Resource yearResource = yearIter.next();
+                        final int year = Integer.valueOf(yearResource.getName());
+                        if ( year < oldYear ) {
+                            brr.delete(yearResource);
+                        } else if ( year == oldYear ) {
+
+                            // same year - check months
+                            final int oldMonth = oldDate.get(Calendar.MONTH) + 1;
+                            final Iterator<Resource> monthIter = yearResource.listChildren();
+                            while ( monthIter.hasNext() ) {
+                                final Resource monthResource = monthIter.next();
+                                final int month = Integer.valueOf(monthResource.getName());
+                                if ( month < oldMonth ) {
+                                    brr.delete(monthResource);
+                                } else if ( month == oldMonth ) {
+
+                                    // same month - check days
+                                    final int oldDay = oldDate.get(Calendar.DAY_OF_MONTH);
+                                    final Iterator<Resource> dayIter = monthResource.listChildren();
+                                    while ( dayIter.hasNext() ) {
+                                        final Resource dayResource = dayIter.next();
+                                        final int day = Integer.valueOf(dayResource.getName());
+                                        if ( day < oldDay ) {
+                                            brr.delete(dayResource);
+                                        } else if ( day == oldDay ) {
+
+                                            // same day - check hours
+                                            final int oldHour = oldDate.get(Calendar.HOUR_OF_DAY);
+                                            final Iterator<Resource> hourIter = dayResource.listChildren();
+                                            while ( hourIter.hasNext() ) {
+                                                final Resource hourResource = hourIter.next();
+                                                final int hour = Integer.valueOf(hourResource.getName());
+                                                if ( hour < oldHour ) {
+                                                    brr.delete(hourResource);
+                                                } else if ( hour == oldHour ) {
+
+                                                    // same hour - check minutes
+                                                    final int oldMinute = oldDate.get(Calendar.MINUTE);
+                                                    final Iterator<Resource> minuteIter = hourResource.listChildren();
+                                                    while ( minuteIter.hasNext() ) {
+                                                        final Resource minuteResource = minuteIter.next();
+
+                                                        final int minute = Integer.valueOf(minuteResource.getName());
+                                                        if ( minute < oldMinute ) {
+                                                            brr.delete(minuteResource);
+                                                        }
+                                                    }
+                                                }
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+                // final commit for outstanding resources
+                resolver.commit();
+
+            } catch (final PersistenceException pe) {
+                // in the case of an error, we just log this as a warning
+                this.logger.warn("Exception during job resource tree cleanup.", pe);
+            } catch (final LoginException ignore) {
+                this.ignoreException(ignore);
+            } finally {
+                if ( resolver != null ) {
+                    resolver.close();
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.apache.sling.discovery.TopologyEventListener#handleTopologyEvent(org.apache.sling.discovery.TopologyEvent)
+     */
+    @Override
+    public void handleTopologyEvent(final TopologyEvent event) {
+        if ( event.getType() == Type.TOPOLOGY_CHANGING ) {
+            this.instances = null;
+        } else if ( event.getType() == Type.TOPOLOGY_CHANGED || event.getType() == Type.TOPOLOGY_INIT ) {
+            if ( event.getNewView().getLocalInstance().isLeader() ) {
+                final Set<String> set = new HashSet<String>();
+                for(final InstanceDescription desc : event.getNewView().getInstances() ) {
+                    set.add(desc.getSlingId());
+                }
+                this.instances = set;
+            }
+        }
+    }
+}
+
diff --git a/src/main/java/org/apache/sling/event/dea/impl/DistributedEventSender.java b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventSender.java
new file mode 100644
index 0000000..95aa021
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/impl/DistributedEventSender.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.event.dea.DEAConstants;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This event handler distributes events from other application nodes in
+ * the cluster on the current instance.
+ * <p>
+ * It's listening for resource added events in the resource tree storing the
+ * distributed events. If a new resource is added, the resource is read,
+ * converted to an event and then send using the local event admin.
+ * <p>
+ */
+public class DistributedEventSender
+    implements EventHandler {
+
+    /** Default logger. */
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** Is the background task still running? */
+    private volatile boolean running;
+
+    /** A local queue for serializing the event processing. */
+    private final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+
+    private final ResourceResolverFactory resourceResolverFactory;
+
+    private final EventAdmin eventAdmin;
+
+    private final String ownRootPathWithSlash;
+
+    private volatile ServiceRegistration<?> serviceRegistration;
+
+    public DistributedEventSender(final BundleContext bundleContext,
+            final String rootPath,
+            final String ownRootPath,
+            final ResourceResolverFactory rrFactory,
+            final EventAdmin eventAdmin) {
+        this.eventAdmin = eventAdmin;
+        this.resourceResolverFactory = rrFactory;
+        this.ownRootPathWithSlash = ownRootPath + "/";
+
+        this.running = true;
+        final Thread backgroundThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                // create service registration properties
+                final Dictionary<String, Object> props = new Hashtable<String, Object>();
+                props.put(Constants.SERVICE_VENDOR, "The Apache Software Foundation");
+
+                // listen for all resource added OSGi events in the DEA area
+                props.put(EventConstants.EVENT_TOPIC, SlingConstants.TOPIC_RESOURCE_ADDED);
+                props.put(EventConstants.EVENT_FILTER, "(path=" + rootPath + "/*)");
+
+                final ServiceRegistration<?> reg =
+                        bundleContext.registerService(new String[] {EventHandler.class.getName()},
+                        DistributedEventSender.this, props);
+
+                DistributedEventSender.this.serviceRegistration = reg;
+
+                try {
+                    runInBackground();
+                } catch (Throwable t) { //NOSONAR
+                    logger.error("Background thread stopped with exception: " + t.getMessage(), t);
+                    running = false;
+                }
+            }
+        });
+        backgroundThread.start();
+    }
+
+    /**
+     * Deactivate this component.
+     */
+    public void stop() {
+        if ( this.serviceRegistration != null ) {
+            this.serviceRegistration.unregister();
+            this.serviceRegistration = null;
+        }
+        // stop background threads by putting empty objects into the queue
+        this.running = false;
+        try {
+            this.queue.put("");
+        } catch (final InterruptedException e) {
+            this.ignoreException(e);
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    /**
+     * Read an event from the resource
+     * @return The event object or <code>null</code>
+     */
+    private Event readEvent(final Resource eventResource) {
+        try {
+            final ValueMap vm = ResourceHelper.getValueMap(eventResource);
+            final String topic = vm.get(EventConstants.EVENT_TOPIC, String.class);
+            if ( topic == null ) {
+                // no topic should never happen as we check the resource type before
+                logger.error("Unable to read distributed event from " + eventResource.getPath() + " : no topic property available.");
+            } else {
+                final Map<String, Object> properties = ResourceHelper.cloneValueMap(vm);
+                // only send event if there are no read errors, otherwise discard it
+                @SuppressWarnings("unchecked")
+                final List<Exception> readErrorList = (List<Exception>) properties.remove(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
+                if ( readErrorList == null ) {
+                    properties.remove(EventConstants.EVENT_TOPIC);
+                    properties.remove(DEAConstants.PROPERTY_DISTRIBUTE);
+                    final Object oldRT = properties.remove("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE);
+                    if ( oldRT != null ) {
+                        properties.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, oldRT);
+                    } else {
+                        properties.remove(ResourceResolver.PROPERTY_RESOURCE_TYPE);
+                    }
+                    try {
+                        final Event event = new Event(topic, properties);
+                        return event;
+                    } catch (final IllegalArgumentException iae) {
+                        // this exception occurs if the topic is not correct (it should never happen,
+                        // but you never know)
+                        logger.error("Unable to read event: " + iae.getMessage(), iae);
+                    }
+                } else {
+                    for(final Exception e : readErrorList) {
+                        logger.warn("Unable to read distributed event from " + eventResource.getPath(), e);
+                    }
+                }
+            }
+        } catch (final InstantiationException ie) {
+            // something happened with the resource in the meantime
+            this.ignoreException(ie);
+        }
+        return null;
+    }
+
+    /**
+     * Background thread
+     */
+    private void runInBackground() {
+        while ( this.running ) {
+            // so let's wait/get the next event from the queue
+            String path = null;
+            try {
+                path = this.queue.take();
+            } catch (final InterruptedException e) {
+                this.ignoreException(e);
+                Thread.currentThread().interrupt();
+                this.running = false;
+            }
+            if ( path != null && path.length() > 0 && this.running ) {
+                ResourceResolver resolver = null;
+                try {
+                    resolver = this.resourceResolverFactory.getAdministrativeResourceResolver(null);
+                    final Resource eventResource = resolver.getResource(path);
+                    if ( DistributedEventAdminImpl.RESOURCE_TYPE_EVENT.equals(eventResource.getResourceType())) {
+                        final Event e = this.readEvent(eventResource);
+                        if ( e != null ) {
+                            // we check event admin as processing is async
+                            final EventAdmin localEA = this.eventAdmin;
+                            if ( localEA != null ) {
+                                localEA.postEvent(e);
+                            } else {
+                                this.logger.error("Unable to post event as no event admin is available.");
+                            }
+                        }
+                    }
+                } catch (final LoginException ex) {
+                    this.logger.error("Exception during creation of resource resolver.", ex);
+                } finally {
+                    if ( resolver != null ) {
+                        resolver.close();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * @see org.osgi.service.event.EventHandler#handleEvent(org.osgi.service.event.Event)
+     */
+    @Override
+    public void handleEvent(final Event event) {
+        final String path = (String) event.getProperty(SlingConstants.PROPERTY_PATH);
+        if ( !path.startsWith(this.ownRootPathWithSlash) ) {
+            try {
+                this.queue.put(path);
+            } catch (final InterruptedException ex) {
+                this.ignoreException(ex);
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Helper method which just logs the exception in debug mode.
+     * @param e
+     */
+    private void ignoreException(final Exception e) {
+        if ( this.logger.isDebugEnabled() ) {
+            this.logger.debug("Ignored exception " + e.getMessage(), e);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/dea/impl/ResourceHelper.java b/src/main/java/org/apache/sling/event/dea/impl/ResourceHelper.java
new file mode 100644
index 0000000..0694b74
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/impl/ResourceHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ValueMap;
+
+public abstract class ResourceHelper {
+
+    public static final String PROPERTY_MARKER_READ_ERROR_LIST = ResourceHelper.class.getName() + "/ReadErrorList";
+
+    public static Map<String, Object> cloneValueMap(final ValueMap vm) throws InstantiationException {
+        List<Exception> hasReadError = null;
+        try {
+            final Map<String, Object> result = new HashMap<String, Object>(vm);
+            for(final Map.Entry<String, Object> entry : result.entrySet()) {
+                if ( entry.getValue() instanceof InputStream ) {
+                    final Object value = vm.get(entry.getKey(), Serializable.class);
+                    if ( value != null ) {
+                        entry.setValue(value);
+                    } else {
+                        if ( hasReadError == null ) {
+                            hasReadError = new ArrayList<Exception>();
+                        }
+                        final int count = hasReadError.size();
+                        // let's find out which class might be missing
+                        ObjectInputStream ois = null;
+                        try {
+                            ois = new ObjectInputStream((InputStream)entry.getValue());
+                            ois.readObject();
+                        } catch (final ClassNotFoundException cnfe) {
+                             hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'", cnfe));
+                        } catch (final IOException ioe) {
+                            hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'", ioe));
+                        } finally {
+                            if ( ois != null ) {
+                                try {
+                                    ois.close();
+                                } catch (IOException ignore) {
+                                    // ignore
+                                }
+                            }
+                        }
+                        if ( hasReadError.size() == count ) {
+                            hasReadError.add(new Exception("Unable to deserialize property '" + entry.getKey() + "'"));
+                        }
+                    }
+                }
+            }
+            if ( hasReadError != null ) {
+                result.put(PROPERTY_MARKER_READ_ERROR_LIST, hasReadError);
+            }
+            return result;
+        } catch ( final IllegalArgumentException iae) {
+            // the JCR implementation might throw an IAE if something goes wrong
+            throw (InstantiationException)new InstantiationException(iae.getMessage()).initCause(iae);
+        }
+    }
+
+    public static ValueMap getValueMap(final Resource resource) throws InstantiationException {
+        final ValueMap vm = resource.getValueMap();
+        // trigger full loading
+        try {
+            vm.size();
+        } catch ( final IllegalArgumentException iae) {
+            // the JCR implementation might throw an IAE if something goes wrong
+            throw (InstantiationException)new InstantiationException(iae.getMessage()).initCause(iae);
+        }
+        return vm;
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/event/dea/package-info.java b/src/main/java/org/apache/sling/event/dea/package-info.java
new file mode 100644
index 0000000..9495421
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/dea/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+@Version("1.0.0")
+package org.apache.sling.event.dea;
+
+import aQute.bnd.annotation.Version;
+
diff --git a/src/test/java/org/apache/sling/event/dea/impl/DistributingEventHandlerTest.java b/src/test/java/org/apache/sling/event/dea/impl/DistributingEventHandlerTest.java
new file mode 100644
index 0000000..3547300
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/dea/impl/DistributingEventHandlerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.dea.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.sling.api.SlingConstants;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.event.dea.DEAConstants;
+import org.apache.sling.settings.SlingSettingsService;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactoryOptions;
+import org.junit.After;
+import org.junit.Before;
+import org.mockito.Mockito;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+public class DistributingEventHandlerTest {
+
+    private static final String TOPIC_PREFIX = "write/";
+
+    private DistributedEventReceiver receiver;
+
+    private DistributedEventSender sender;
+
+    private static final String MY_APP_ID = "1234";
+
+    private static final String OTHER_APP_ID = "5678";
+
+    private final List<Event> events = Collections.synchronizedList(new ArrayList<Event>());
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setup() throws Exception {
+        final BundleContext bc = Mockito.mock(BundleContext.class);
+        Mockito.when(bc.registerService(Mockito.any(String[].class),
+                Mockito.any(),
+                Mockito.any(Dictionary.class))).thenReturn(null);
+
+        final SlingSettingsService otherSettings = Mockito.mock(SlingSettingsService.class);
+        Mockito.when(otherSettings.getSlingId()).thenReturn(OTHER_APP_ID);
+
+        final EventAdmin ea = new EventAdmin() {
+
+            @Override
+            public void sendEvent(final Event event) {
+                this.postEvent(event);
+            }
+
+            @Override
+            public void postEvent(final Event event) {
+                final String topic = event.getTopic();
+                if ( topic.equals(SlingConstants.TOPIC_RESOURCE_ADDED) ) {
+                    sender.handleEvent(event);
+                } else if ( topic.startsWith(TOPIC_PREFIX) ) {
+                    events.add(event);
+                }
+            }
+        };
+        final MockResourceResolverFactoryOptions opts = new MockResourceResolverFactoryOptions();
+        opts.setEventAdmin(ea);
+        final ResourceResolverFactory factory = new MockResourceResolverFactory(opts);
+
+        this.sender = new DistributedEventSender(bc, DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH,
+                DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH + "/" + MY_APP_ID, factory, ea);
+
+        this.receiver = new DistributedEventReceiver(bc, DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH,
+                DistributedEventAdminImpl.DEFAULT_REPOSITORY_PATH + "/" + OTHER_APP_ID, 15, factory, otherSettings);
+    }
+
+    @After
+    public void cleanup() {
+        if ( this.sender != null ) {
+            this.sender.stop();
+            this.sender = null;
+        }
+        if ( this.receiver != null ) {
+            this.receiver.stop();
+            this.receiver = null;
+        }
+    }
+
+    @org.junit.Test(timeout=5000) public void testSendEvent() throws Exception {
+        this.events.clear();
+
+        final String VALUE = "some value";
+        final String topic = TOPIC_PREFIX + "event/test";
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put("a property", VALUE);
+        final Event e = new Event(topic, props);
+        this.receiver.handleEvent(e);
+
+        while ( this.events.size() == 0 ) {
+            Thread.sleep(5);
+        }
+        final Event receivedEvent = this.events.get(0);
+
+        assertEquals(topic, receivedEvent.getTopic());
+        assertEquals(OTHER_APP_ID, receivedEvent.getProperty(DEAConstants.PROPERTY_APPLICATION));
+        assertEquals(VALUE, receivedEvent.getProperty("a property"));
+        assertNull(receivedEvent.getProperty(ResourceResolver.PROPERTY_RESOURCE_TYPE));
+
+        this.events.clear();
+    }
+
+    @org.junit.Test(timeout=5000) public void testSendEventPlusAppId() throws Exception {
+        this.events.clear();
+
+        final String VALUE = "some value";
+        final String topic = TOPIC_PREFIX + "event/test";
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put("a property", "some value");
+        // now we check if the application id is handled correctly
+        props.put(DEAConstants.PROPERTY_APPLICATION, "foo");
+
+        final Event e = new Event(topic, props);
+        this.receiver.handleEvent(e);
+
+        while ( this.events.size() == 0 ) {
+            Thread.sleep(5);
+        }
+        final Event receivedEvent = this.events.get(0);
+
+        assertEquals(topic, receivedEvent.getTopic());
+        assertEquals(OTHER_APP_ID, receivedEvent.getProperty(DEAConstants.PROPERTY_APPLICATION));
+        assertEquals(VALUE, receivedEvent.getProperty("a property"));
+        assertNull(receivedEvent.getProperty(ResourceResolver.PROPERTY_RESOURCE_TYPE));
+
+        this.events.clear();
+    }
+
+    @org.junit.Test(timeout=5000) public void testSendEventWithResourceType() throws Exception {
+        this.events.clear();
+
+        final String topic = TOPIC_PREFIX + "event/test";
+        final String RT = "my:resourceType";
+
+        final Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, RT);
+
+        final Event e = new Event(topic, props);
+        this.receiver.handleEvent(e);
+
+        while ( this.events.size() == 0 ) {
+            Thread.sleep(5);
+        }
+        final Event receivedEvent = this.events.get(0);
+
+        assertEquals(topic, receivedEvent.getTopic());
+        assertEquals(OTHER_APP_ID, receivedEvent.getProperty(DEAConstants.PROPERTY_APPLICATION));
+        assertEquals(RT, receivedEvent.getProperty(ResourceResolver.PROPERTY_RESOURCE_TYPE));
+        assertNull(receivedEvent.getProperty("event.dea." + ResourceResolver.PROPERTY_RESOURCE_TYPE));
+
+        this.events.clear();
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.