You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by ti...@apache.org on 2020/10/01 12:09:08 UTC

[aries-typedevent] branch main updated (6635fbe -> 07b5f41)

This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git.


    from 6635fbe  Use the Component DSL to run the whiteboard, contribute integration tests from the BRAIN-IoT prototype
     new c9f16c1  Correct the JavaDoc in unit tests
     new 07b5f41  Initial support for remote events using OSGi Remote Services

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../typedevent/bus/impl/TypedEventBusImpl.java     |  10 +-
 .../typedevent/bus/impl/TypedEventBusImplTest.java | 103 +++---
 .../typedevent/bus/osgi/FilterIntegrationTest.java |   8 +-
 .../osgi/UnhandledEventHandlerIntegrationTest.java |   3 +-
 .../org.apache.aries.typedevent.remote.api/pom.xml |  41 +++
 .../aries/typedevent/remote/api/FilterDTO.java     |  16 +-
 .../remote/api/RemoteEventConstants.java           |  49 +++
 .../typedevent/remote/api/RemoteEventMonitor.java  | 107 +++++++
 .../aries/typedevent/remote/api/RemoteEvents.java  |  12 +-
 .../typedevent/remote/api/RemoteMonitorEvent.java  |  15 +-
 .../aries/typedevent/remote/api/package-info.java  |   9 +-
 .../pom.xml                                        |  31 +-
 .../impl/LocalEventBusForwarder.java               | 191 +++++++++++
 .../remoteservices/impl/RemoteEventBusImpl.java    | 213 +++++++++++++
 .../impl/RemoteServiceEventsActivator.java         | 332 ++++++++++++++++++++
 .../remote/remoteservices/spi/RemoteEventBus.java  |  44 ++-
 .../remote/remoteservices/spi/package-info.java    |   9 +-
 .../remote/remoteservices}/common/TestEvent.java   |   2 +-
 .../impl/RemoteEventBusImplTest.java               | 151 +++++++++
 .../osgi/AbstractIntegrationTest.java              |  16 +-
 .../osgi/RemoteEventBusIntegrationTest.java        | 348 +++++++++++++++++++++
 .../test.bndrun                                    |  11 +-
 .../org.apache.aries.typedevent.remote.spi/pom.xml |  50 +++
 .../remote/spi/LocalEventConsumerManager.java      | 183 +++++++++++
 .../remote/spi/RemoteEventMonitorImpl.java         | 166 ++++++++++
 .../aries/typedevent/remote/spi/package-info.java  |   9 +-
 org.apache.aries.typedevent.remote/pom.xml         |  21 ++
 pom.xml                                            |  13 +
 28 files changed, 2020 insertions(+), 143 deletions(-)
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml
 copy org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2Consumer.java => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java (68%)
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java
 copy org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2Consumer.java => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java (69%)
 copy org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2Consumer.java => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java (70%)
 copy org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventTask.java => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java (85%)
 copy {org.apache.aries.typedevent.bus => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices}/pom.xml (75%)
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
 copy org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/MonitorEventTask.java => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java (51%)
 copy org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventTask.java => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java (84%)
 copy {org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices}/common/TestEvent.java (92%)
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
 copy {org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices}/osgi/AbstractIntegrationTest.java (80%)
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
 copy {org.apache.aries.typedevent.bus => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices}/test.bndrun (81%)
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java
 create mode 100644 org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java
 copy org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/EventTask.java => org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java (85%)
 create mode 100644 org.apache.aries.typedevent.remote/pom.xml


[aries-typedevent] 02/02: Initial support for remote events using OSGi Remote Services

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git

commit 07b5f41a8e5d8459801d7703a71f887a659ac55c
Author: Tim Ward <ti...@apache.org>
AuthorDate: Thu Oct 1 13:01:13 2020 +0100

    Initial support for remote events using OSGi Remote Services
---
 .../typedevent/bus/impl/TypedEventBusImpl.java     |  10 +-
 .../typedevent/bus/impl/TypedEventBusImplTest.java | 101 +++---
 .../typedevent/bus/osgi/FilterIntegrationTest.java |   8 +-
 .../osgi/UnhandledEventHandlerIntegrationTest.java |   3 +-
 .../org.apache.aries.typedevent.remote.api/pom.xml |  41 +++
 .../aries/typedevent/remote/api/FilterDTO.java     |  33 ++
 .../remote/api/RemoteEventConstants.java           |  49 +++
 .../typedevent/remote/api/RemoteEventMonitor.java  | 107 +++++++
 .../aries/typedevent/remote/api/RemoteEvents.java  |  27 ++
 .../typedevent/remote/api/RemoteMonitorEvent.java  |  32 ++
 .../aries/typedevent/remote/api/package-info.java  |  19 ++
 .../pom.xml                                        | 111 +++++++
 .../impl/LocalEventBusForwarder.java               | 191 +++++++++++
 .../remoteservices/impl/RemoteEventBusImpl.java    | 213 +++++++++++++
 .../impl/RemoteServiceEventsActivator.java         | 332 ++++++++++++++++++++
 .../remote/remoteservices/spi/RemoteEventBus.java  |  42 +++
 .../remote/remoteservices/spi/package-info.java    |  19 ++
 .../remote/remoteservices/common/TestEvent.java    |  21 ++
 .../impl/RemoteEventBusImplTest.java               | 151 +++++++++
 .../osgi/AbstractIntegrationTest.java              |  70 +++++
 .../osgi/RemoteEventBusIntegrationTest.java        | 348 +++++++++++++++++++++
 .../test.bndrun                                    |  56 ++++
 .../org.apache.aries.typedevent.remote.spi/pom.xml |  50 +++
 .../remote/spi/LocalEventConsumerManager.java      | 183 +++++++++++
 .../remote/spi/RemoteEventMonitorImpl.java         | 166 ++++++++++
 .../aries/typedevent/remote/spi/package-info.java  |  19 ++
 org.apache.aries.typedevent.remote/pom.xml         |  21 ++
 pom.xml                                            |  13 +
 28 files changed, 2382 insertions(+), 54 deletions(-)

diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index 5be83f9..21a0828 100644
--- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -19,6 +19,11 @@ package org.apache.aries.typedevent.bus.impl;
 
 import static java.util.Collections.emptyMap;
 import static java.util.stream.Collectors.toList;
+import static org.osgi.namespace.implementation.ImplementationNamespace.IMPLEMENTATION_NAMESPACE;
+import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_IMPLEMENTATION;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_SPECIFICATION_VERSION;
 import static org.osgi.util.converter.Converters.standardConverter;
 
 import java.lang.reflect.ParameterizedType;
@@ -34,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
+import org.osgi.annotation.bundle.Capability;
 import org.osgi.framework.Constants;
 import org.osgi.framework.Filter;
 import org.osgi.framework.FrameworkUtil;
@@ -45,6 +51,8 @@ import org.osgi.service.typedevent.UnhandledEventHandler;
 import org.osgi.service.typedevent.UntypedEventHandler;
 import org.osgi.util.converter.TypeReference;
 
+@Capability(namespace=SERVICE_NAMESPACE, attribute="objectClass:List<String>=org.osgi.service.typedevent.TypedEventBus", uses=TypedEventBus.class)
+@Capability(namespace=IMPLEMENTATION_NAMESPACE, name=TYPED_EVENT_IMPLEMENTATION, version=TYPED_EVENT_SPECIFICATION_VERSION)
 public class TypedEventBusImpl implements TypedEventBus {
 
     private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() {
@@ -248,7 +256,7 @@ public class TypedEventBusImpl implements TypedEventBus {
     }
 
     private Filter getFilter(Long serviceId, Map<String, Object> properties) throws IllegalArgumentException {
-        String key = "event.filter";
+        String key = TYPED_EVENT_FILTER;
         return getFilter(serviceId, key, properties.get(key));
     }
 
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
index 6667969..39bfbc9 100644
--- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
@@ -21,6 +21,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.osgi.framework.Constants.SERVICE_ID;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TYPE;
+import static org.osgi.util.converter.Converters.standardConverter;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -35,11 +40,9 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.osgi.framework.Constants;
-import org.osgi.service.typedevent.TypedEventConstants;
 import org.osgi.service.typedevent.TypedEventHandler;
 import org.osgi.service.typedevent.UnhandledEventHandler;
 import org.osgi.service.typedevent.UntypedEventHandler;
-import org.osgi.util.converter.Converters;
 
 public class TypedEventBusImplTest {
 
@@ -132,31 +135,31 @@ public class TypedEventBusImplTest {
 
         Map<String, Object> serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
-        serviceProperties.put(Constants.SERVICE_ID, 42L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+        serviceProperties.put(SERVICE_ID, 42L);
 
         impl.addTypedEventHandler(handlerA, serviceProperties);
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent2.class.getName());
-        serviceProperties.put(Constants.SERVICE_ID, 43L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent2.class.getName());
+        serviceProperties.put(SERVICE_ID, 43L);
 
         impl.addTypedEventHandler(handlerB, serviceProperties);
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
-        serviceProperties.put(Constants.SERVICE_ID, 44L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(SERVICE_ID, 44L);
 
         impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
-        serviceProperties.put(Constants.SERVICE_ID, 45L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
+        serviceProperties.put(SERVICE_ID, 45L);
 
         impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
 
@@ -206,20 +209,20 @@ public class TypedEventBusImplTest {
         event.message = "boo";
         
         Map<String, Object> serviceProperties = new HashMap<>();
-        serviceProperties.put(Constants.SERVICE_ID, 42L);
+        serviceProperties.put(SERVICE_ID, 42L);
         
         impl.addTypedEventHandler(handler, serviceProperties);
         
         serviceProperties = new HashMap<>();
         
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, SpecialTestEvent.class.getName());
-        serviceProperties.put(Constants.SERVICE_ID, 43L);
+        serviceProperties.put(TYPED_EVENT_TYPE, SpecialTestEvent.class.getName());
+        serviceProperties.put(SERVICE_ID, 43L);
         
         impl.addTypedEventHandler(handler2, serviceProperties);
 
         serviceProperties = new HashMap<>();
         
-        serviceProperties.put(Constants.SERVICE_ID, 44L);
+        serviceProperties.put(SERVICE_ID, 44L);
         
         impl.addTypedEventHandler(handler3, serviceProperties);
         
@@ -258,35 +261,35 @@ public class TypedEventBusImplTest {
 
         Map<String, Object> serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName());
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
-        serviceProperties.put(Constants.SERVICE_ID, 42L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent.class.getName());
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+        serviceProperties.put(SERVICE_ID, 42L);
 
         impl.addTypedEventHandler(handlerA, serviceProperties);
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName());
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent2.class.getName());
-        serviceProperties.put(Constants.SERVICE_ID, 43L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName());
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent2.class.getName());
+        serviceProperties.put(SERVICE_ID, 43L);
 
         impl.addTypedEventHandler(handlerB, serviceProperties);
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName());
-        serviceProperties.put(Constants.SERVICE_ID, 44L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent.class.getName());
+        serviceProperties.put(SERVICE_ID, 44L);
 
         impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName());
-        serviceProperties.put(Constants.SERVICE_ID, 45L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName());
+        serviceProperties.put(SERVICE_ID, 45L);
 
         impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
 
-        impl.deliver(event.getClass().getName(), Converters.standardConverter().convert(event).to(Map.class));
+        impl.deliver(event.getClass().getName(), standardConverter().convert(event).to(Map.class));
 
         assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS));
 
@@ -314,35 +317,35 @@ public class TypedEventBusImplTest {
 
         Map<String, Object> serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
-        serviceProperties.put("event.filter", "(message=foo)");
-        serviceProperties.put(Constants.SERVICE_ID, 42L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+        serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)");
+        serviceProperties.put(SERVICE_ID, 42L);
 
         impl.addTypedEventHandler(handlerA, serviceProperties);
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
-        serviceProperties.put("event.filter", "(message=bar)");
-        serviceProperties.put(Constants.SERVICE_ID, 43L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+        serviceProperties.put(TYPED_EVENT_FILTER, "(message=bar)");
+        serviceProperties.put(SERVICE_ID, 43L);
 
         impl.addTypedEventHandler(handlerB, serviceProperties);
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
-        serviceProperties.put("event.filter", "(message=foo)");
-        serviceProperties.put(Constants.SERVICE_ID, 44L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)");
+        serviceProperties.put(SERVICE_ID, 44L);
 
         impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
-        serviceProperties.put("event.filter", "(message=bar)");
-        serviceProperties.put(Constants.SERVICE_ID, 45L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_FILTER, "(message=bar)");
+        serviceProperties.put(SERVICE_ID, 45L);
 
         impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
 
@@ -395,10 +398,10 @@ public class TypedEventBusImplTest {
 
         Map<String, Object> serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
-        serviceProperties.put("event.filter", "");
-        serviceProperties.put(Constants.SERVICE_ID, 42L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+        serviceProperties.put(TYPED_EVENT_FILTER, "");
+        serviceProperties.put(SERVICE_ID, 42L);
 
         impl.addTypedEventHandler(handlerA, serviceProperties);
 
@@ -420,10 +423,10 @@ public class TypedEventBusImplTest {
 
         Map<String, Object> serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
-        serviceProperties.put("event.filter", "(message=foo)");
-        serviceProperties.put(Constants.SERVICE_ID, 42L);
+        serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+        serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)");
+        serviceProperties.put(SERVICE_ID, 42L);
 
         impl.addTypedEventHandler(handlerA, serviceProperties);
 
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
index d4dcc78..2d17847 100644
--- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.aries.typedevent.bus.osgi;
 
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+
 import java.util.Dictionary;
 import java.util.Hashtable;
 
@@ -71,12 +73,12 @@ public class FilterIntegrationTest extends AbstractIntegrationTest {
     @Test
     public void testFilteredListener() throws Exception {
         Dictionary<String, Object> props = new Hashtable<>();
-        props.put("event.filter", "(message=foo)");
+        props.put(TYPED_EVENT_FILTER, "(message=foo)");
         
         regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
         
         props = new Hashtable<>();
-        props.put("event.filter", "(message=bar)");
+        props.put(TYPED_EVENT_FILTER, "(message=bar)");
         
         regs.add(context.registerService(TypedEventHandler.class, typedEventHandlerB, props));
         
@@ -107,7 +109,7 @@ public class FilterIntegrationTest extends AbstractIntegrationTest {
     @Test
     public void testFilteredListenerEmptyString() throws Exception {
         Dictionary<String, Object> props = new Hashtable<>();
-        props.put("event.filter", "");
+        props.put(TYPED_EVENT_FILTER, "");
         
         
         regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
index c781b62..084a92a 100644
--- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
@@ -24,6 +24,7 @@ import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.after;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
 
 import java.util.Dictionary;
 import java.util.Hashtable;
@@ -128,7 +129,7 @@ public class UnhandledEventHandlerIntegrationTest extends AbstractIntegrationTes
     public void testUnhandledDueToFilter() throws InterruptedException {
         
         Dictionary<String, Object> props = new Hashtable<>();
-        props.put("event.filter", "(message=foo)");
+        props.put(TYPED_EVENT_FILTER, "(message=foo)");
         
         regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
         
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml
new file mode 100644
index 0000000..a54960e
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml
@@ -0,0 +1,41 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.aries.typedevent</groupId>
+        <artifactId>typedevent-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    
+    <groupId>org.apache.aries.typedevent.remote</groupId>
+    <artifactId>org.apache.aries.typedevent.remote.api</artifactId>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.typedevent</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.annotation</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.annotation.bundle</artifactId>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java
new file mode 100644
index 0000000..56902c7
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.dto.DTO;
+
+/**
+ * A monitoring event filter.
+ *
+ * If both LDAP and regular expressions are supplied, then both must match.
+ */
+@ProviderType
+public class FilterDTO extends DTO {
+
+    public String ldapExpression;
+
+    public String regularExpression;
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java
new file mode 100644
index 0000000..9b0ab57
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.annotation.versioning.ProviderType;
+
+/**
+ * This interface should not be used by typical users of the
+ * Typed Event specification. It is intended to be a bridge
+ * between different mechanisms for broadcasting remote events
+ */
+
+@ProviderType
+public class RemoteEventConstants {
+    
+    /**
+     * This property key will be set to true in any event that originated from a remote system.
+     * This is to allow different remoting implementations to identify events which should not
+     * be sent on externally, as they are already external.
+     */
+    public static final String REMOTE_EVENT_MARKER = ".org.apache.aries.typedevent.remote";
+    
+    /**
+     * This service property can be used by Event Handler whiteboard services to signal that
+     * they wish to receive remote events by using the value <code>true</code>. Depending 
+     * upon the configuration of the remote event backend it may not be necessary to supply 
+     * this property to receive remote events.
+     */
+    public static final String RECEIVE_REMOTE_EVENTS = "org.apache.aries.typedevent.remote.events";
+
+    private RemoteEventConstants() {
+        // Deliberately impossible to construct
+    }
+    
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java
new file mode 100644
index 0000000..0bbcf9c
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.pushstream.PushStream;
+
+import java.time.Instant;
+
+/**
+ * The {@link RemoteEventMonitor} service can be used to monitor the events that are
+ * sent using the EventBus, and that are received from remote EventBus
+ * instances
+ */
+@ProviderType
+public interface RemoteEventMonitor {
+
+    /**
+     * Get a stream of events that match any of the filters, starting now.
+     * <p>
+     * Filter expressions may be supplied and applied by the monitoring implementation.
+     * In some cases this may be more optimal than adding your own filter to the returned
+     * PushStream.
+     *
+     * @param filters containing filter expression definitions. The {@link RemoteMonitorEvent#eventType}
+     *                field is available with the key <code>-eventType</code> and the
+     *                {@link RemoteMonitorEvent#publishType} field is available with the key
+     *                <code>-publishType</code>, in addition to fields defined in the event.
+     *                If the event contains nested data structures then those are accessible using
+     *                nested key names separated by a '.' character (e.g. <code>"foo.bar"</code>
+     *                would correspond to the <code>bar<code> field of the <code>foo</code> value
+     *                from the event.
+     *                <p>
+     *                If a {@link FilterDTO} contains both LDAP and regular expressions, then both must match.
+     *                A RegEx pattern allows the whole event content to be matched, without necessarily specifying
+     *                a key (although keys are present and separated with ':').
+     * @return A stream of event data
+     */
+    PushStream<RemoteMonitorEvent> monitorEvents(FilterDTO... filters);
+
+    /**
+     * Get a stream of events, including up to the
+     * requested number of historical data events, that match any of the filters.
+     *
+     * @param history The requested number of historical
+     * events, note that fewer than this number of events
+     * may be returned if history is unavailable, or if
+     * insufficient events have been sent.
+     *
+     * @param filters containing filter expression definitions. The {@link RemoteMonitorEvent#eventType}
+     *                field is available with the key <code>-eventType</code> and the
+     *                {@link RemoteMonitorEvent#publishType} field is available with the key
+     *                <code>-publishType</code>, in addition to fields defined in the event.
+     *                If the event contains nested data structures then those are accessible using
+     *                nested key names separated by a '.' character (e.g. <code>"foo.bar"</code>
+     *                would correspond to the <code>bar<code> field of the <code>foo</code> value
+     *                from the event.
+     *                <p>
+     *                If a {@link FilterDTO} contains both LDAP and regular expressions, then both must match.
+     *                A RegEx pattern allows the whole event content to be matched, without necessarily specifying
+     *                a key (although keys are present and separated with ':').
+     *
+     * @return A stream of event data
+     */
+    PushStream<RemoteMonitorEvent> monitorEvents(int history, FilterDTO...filters);
+
+    /**
+     * Get a stream of events, including historical
+     * data events prior to the supplied time
+     *
+     * @param history The requested time after which
+     * historical events, should be included. Note
+     * that events may have been discarded, or history
+     * unavailable.
+     *
+     * @param filters containing filter expression definitions. The {@link RemoteMonitorEvent#eventType}
+     *                field is available with the key <code>-eventType</code> and the
+     *                {@link RemoteMonitorEvent#publishType} field is available with the key
+     *                <code>-publishType</code>, in addition to fields defined in the event.
+     *                If the event contains nested data structures then those are accessible using
+     *                nested key names separated by a '.' character (e.g. <code>"foo.bar"</code>
+     *                would correspond to the <code>bar<code> field of the <code>foo</code> value
+     *                from the event.
+     *                <p>
+     *                If a {@link FilterDTO} contains both LDAP and regular expressions, then both must match.
+     *                A RegEx pattern allows the whole event content to be matched, without necessarily specifying
+     *                a key (although keys are present and separated with ':').
+     *
+     * @return A stream of event data
+     */
+    PushStream<RemoteMonitorEvent> monitorEvents(Instant history, FilterDTO...filters);
+
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java
new file mode 100644
index 0000000..a5ad278
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.service.component.annotations.ComponentPropertyType;
+
+/** 
+ * This annotation can be used on a DS component to mark it as wanting to receive remote events
+ */
+@ComponentPropertyType
+public @interface RemoteEvents {
+    public static final java.lang.String PREFIX_ = "org.apache.aries.typedevent.";
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java
new file mode 100644
index 0000000..2391b39
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.annotation.versioning.ProviderType;
+
+/**
+ * A monitoring event.
+ */
+@ProviderType
+public class RemoteMonitorEvent extends org.osgi.service.typedevent.monitor.MonitorEvent {
+
+    public static enum PublishType {
+        LOCAL, REMOTE;
+    }
+
+    public PublishType publishType;
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java
new file mode 100644
index 0000000..da953a5
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@org.osgi.annotation.bundle.Export
+@org.osgi.annotation.versioning.Version("0.0.1")
+package org.apache.aries.typedevent.remote.api;
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/pom.xml b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/pom.xml
new file mode 100644
index 0000000..6647b7a
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/pom.xml
@@ -0,0 +1,111 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.aries.typedevent</groupId>
+        <artifactId>typedevent-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <groupId>org.apache.aries.typedevent.remote.remoteservices</groupId>
+    <artifactId>org.apache.aries.typedevent.remote.remoteservices</artifactId>
+    
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.aries.typedevent</groupId>
+                <artifactId>typedevent-test-bom</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.typedevent</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.annotation</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.aries.typedevent.remote</groupId>
+            <artifactId>org.apache.aries.typedevent.remote.spi</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.annotation.bundle</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.namespace.service</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.aries.component-dsl</groupId>
+            <artifactId>org.apache.aries.component-dsl.component-dsl</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.converter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.aries.typedevent</groupId>
+            <artifactId>org.apache.aries.typedevent.bus</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.test.junit5</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-resolver-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-testing-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java
new file mode 100644
index 0000000..9a1807f
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+import static java.util.Collections.emptyMap;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.apache.aries.typedevent.remote.spi.LocalEventConsumerManager;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.util.converter.TypeReference;
+
+/**
+ * This class is responsible for taking events from the local framework and
+ * sending them on to interested remote frameworks
+ */
+public class LocalEventBusForwarder extends LocalEventConsumerManager {
+
+    private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() {
+    };
+    
+    /**
+     * Map access and mutation must be synchronized on {@link #lock}. Values from
+     * the map should be copied as the contents are not thread safe.
+     */
+    private final Map<String, Map<RemoteEventBus, Filter>> eventTypeToRemotes = new HashMap<>();
+    
+    /**
+     * Map access and mutation must be synchronized on {@link #lock}.
+     * Values from the map should be copied as the contents are not thread safe.
+     */
+    private final Map<Long, List<String>> remoteTopicInterests = new HashMap<>();
+
+    /**
+     * Map access and mutation must be synchronized on {@link #lock}.
+     * Values from the map should be copied as the contents are not thread safe.
+     */
+    private final Map<Long, RemoteEventBus> remoteBuses = new HashMap<>();
+    
+    private final Object lock = new Object();
+
+    @Override
+    public void notifyUntyped(String topic, Map<String, Object> event) {
+        Map<RemoteEventBus, Filter> possibleTargets;
+        synchronized (lock) {
+            possibleTargets = eventTypeToRemotes.getOrDefault(topic, emptyMap());
+        }
+        
+        possibleTargets.entrySet().stream()
+            .filter(e -> e.getValue() == null || e.getValue().matches(event))
+            .map(Entry::getKey)
+            .forEach(r -> r.notify(topic, event));
+    }
+
+    private Long getServiceId(Map<String, Object> properties) {
+        return standardConverter().convert(properties.get(Constants.SERVICE_ID)).to(Long.class);
+    }
+
+    void addRemoteEventBus(RemoteEventBus remote, Map<String, Object> properties) {
+        doAdd(remote, properties);
+        updateRemoteInterest();
+    }
+
+    private void doAdd(RemoteEventBus remote, Map<String, Object> properties) {
+        Object consumed = properties.get(RemoteEventBus.REMOTE_EVENT_FILTERS);
+
+        if (consumed == null) {
+            // TODO log a broken behaviour
+            return;
+        }
+
+        Map<String, Filter> topicsToFilters = standardConverter().convert(consumed).to(LIST_OF_STRINGS)
+                .stream()
+                .map(s -> s.split("=", 2))
+                .collect(toMap(s -> s[0], s -> safeCreateFilter(s[1])));
+
+        Long serviceId = getServiceId(properties);
+
+        List<String> interestedTopics = topicsToFilters.keySet().stream().collect(toList());
+        synchronized (lock) {
+            remoteBuses.put(serviceId, remote);
+            remoteTopicInterests.put(serviceId, interestedTopics);
+            
+            interestedTopics.forEach(s -> {
+                Map<RemoteEventBus, Filter> perTopicMap = eventTypeToRemotes
+                        .computeIfAbsent(s, x -> new HashMap<>());
+                perTopicMap.put(remote, topicsToFilters.get(s));
+            });
+        }
+    }
+    
+    private Filter safeCreateFilter(String filterString) {
+        try {
+            return FrameworkUtil.createFilter(filterString);
+        } catch (InvalidSyntaxException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+            try {
+                return FrameworkUtil.createFilter("(&(x=true)(x=false))");
+            } catch (InvalidSyntaxException e1) {
+                // TODO log properly
+                throw new RuntimeException("Serious problem!");
+            }
+        }
+    }
+
+    void updatedRemoteEventBus(Map<String, Object> properties) {
+        Long serviceId = getServiceId(properties);
+        synchronized (lock) {
+            RemoteEventBus remote = remoteBuses.get(serviceId);
+            doRemove(remote, properties);
+            doAdd(remote, properties);
+        }
+        updateRemoteInterest();
+    }
+
+    void removeRemoteEventBus(RemoteEventBus remote, Map<String, Object> properties) {
+        doRemove(remote, properties);
+        updateRemoteInterest();
+    }
+
+    private void doRemove(RemoteEventBus remote, Map<String, Object> properties) {
+        Long serviceId = getServiceId(properties);
+
+        synchronized (lock) {
+            remoteBuses.remove(serviceId);
+            List<String> consumed = remoteTopicInterests.remove(serviceId);
+            if(consumed != null) {
+                consumed.forEach(s -> {
+                    Map<RemoteEventBus, ?> perTopic = eventTypeToRemotes.get(s);
+                    if(perTopic != null) {
+                        perTopic.remove(remote);
+                        if(perTopic.isEmpty()) {
+                            eventTypeToRemotes.remove(s);
+                        }
+                    }
+                });
+            }
+        }
+    }
+
+    private void updateRemoteInterest() {
+
+        Map<String, String> targets;
+        synchronized (lock) {
+            targets = eventTypeToRemotes.entrySet().stream()
+            .collect(Collectors.toMap(Entry::getKey, 
+                    e -> e.getValue().values().stream()
+                    .map(f -> f == null ? "" : f.toString())
+                    .reduce("", this::mergeFilterStrings)));
+            
+        }
+        
+        updateTargets(targets);
+    }
+    
+    private String mergeFilterStrings(String a, String b) {
+        if(a == null || "".equals(a)) {
+            return b == null ? "" : b;
+        } else if (b == null || "".equals(b)) {
+            return a;
+        } else {
+            return "(|" + a + b + ")";
+        }
+    }
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
new file mode 100644
index 0000000..67f63e4
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE;
+
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.apache.aries.typedevent.remote.api.RemoteEventConstants;
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.osgi.annotation.bundle.Capability;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.annotations.RequireTypedEvents;
+
+/**
+ * This class implements {@link RemoteEventBus} and is responsible for receiving
+ * events from remote frameworks and publishing them in the local framework
+ */
+@Capability(namespace=SERVICE_NAMESPACE, attribute="objectClass:List<String>=org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus", uses=RemoteEventBus.class)
+@RequireTypedEvents
+public class RemoteEventBusImpl implements RemoteEventBus {
+    
+    private final TypedEventBus eventBus;
+    
+    private ServiceRegistration<RemoteEventBus> reg;
+    
+    private Map<String, Filter> topicsToFilters = new HashMap<>();
+    
+    private final Map<Long, Map<String, Filter>> servicesToInterests = new HashMap<>();
+    
+    private final Object lock = new Object();
+
+    public RemoteEventBusImpl(TypedEventBus eventBus) {
+        this.eventBus = eventBus;
+    }
+    
+    public void init(BundleContext ctx) {
+        ServiceRegistration<RemoteEventBus> reg = ctx.registerService(RemoteEventBus.class, this, null);
+        
+        Map<String, Filter> filters;
+        synchronized(lock) {
+            this.reg = reg;
+            filters = topicsToFilters;
+        }
+        updateReg(filters);
+    }
+
+    public void destroy() {
+        try {
+            ServiceRegistration<?> reg;
+            synchronized (lock) {
+                reg = this.reg;
+                this.reg = null;
+            }
+            
+            if(reg != null) {
+                reg.unregister();
+            }
+        } catch (IllegalStateException ise) {
+            // TODO log
+        }
+    }
+    
+    @Override
+    public void notify(String topic, Map<String, Object> properties) {
+        
+        boolean hasTopicInterest;
+        Filter filter;
+        synchronized (lock) {
+            hasTopicInterest = topicsToFilters.containsKey(topic);
+            filter = topicsToFilters.get(topic);
+        }
+        
+        if(hasTopicInterest) {
+            if(filter == null || filter.matches(properties)) {
+                properties.put(RemoteEventConstants.REMOTE_EVENT_MARKER, Boolean.TRUE);
+                eventBus.deliverUntyped(topic, properties);
+            } else {
+                //TODO log filter mismatch
+            }
+        } else {
+            // TODO log topic mismatch
+        }
+    }
+
+    /**
+     * Update the data structures and registration to reflect the topic interests
+     * of the local framework
+     * 
+     * @param id
+     * @param topics
+     * @param filter
+     */
+    void updateLocalInterest(Long id, List<String> topics, Filter filter) {
+
+        boolean doUpdate = false;
+
+        Map<String, Filter> newData = topics.stream()
+                .collect(toMap(identity(), x -> filter, (a,b) -> a));
+        
+        Map<String, Filter> updatedFilters;
+        synchronized(lock) {
+            doUpdate = true;
+            servicesToInterests.put(id, newData);
+            topicsToFilters = getUpdatedFilters();
+            updatedFilters = topicsToFilters;
+        }
+        
+        if(doUpdate) {
+            updateReg(updatedFilters);
+        }
+    }
+
+    private Map<String, Filter> getUpdatedFilters() {
+        synchronized (lock) {
+            return servicesToInterests.values().stream()
+                    .flatMap(m -> m.entrySet().stream())
+                    .collect(Collectors.toMap(Entry::getKey, Entry::getValue, 
+                            this::combineFilters));
+        }
+    }
+
+    private Filter combineFilters(Filter a, Filter b) {
+        if(a == null) {
+            return b;
+        } else if (b == null) {
+            return a;
+        } else {
+            try {
+                return FrameworkUtil.createFilter("(|" + a.toString() + b.toString() + ")");
+            } catch (InvalidSyntaxException e) {
+                // TODO Auto-generated catch block
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    private void updateReg(Map<String, Filter> filters) {
+        
+        Hashtable<String, Object> props = new Hashtable<>();
+        
+        props.put(Constants.SERVICE_EXPORTED_INTERFACES, RemoteEventBus.class.getName());
+        props.put(Constants.SERVICE_EXPORTED_INTENTS, "osgi.basic");
+        List<String> remoteFilters = filters.entrySet().stream()
+                .map(e -> e.getKey() + "=" + (e.getValue() == null ? "" : e.getValue().toString()))
+                .collect(toList());
+        props.put(REMOTE_EVENT_FILTERS, remoteFilters);
+        
+        
+        ServiceRegistration<?> reg;
+        synchronized (lock) {
+            reg = this.reg;
+        }
+        
+        if(reg != null) {
+            // Only update if there is a change
+            Object existingFilters = reg.getReference().getProperty(REMOTE_EVENT_FILTERS);
+            if(!remoteFilters.equals(existingFilters)) {
+                reg.setProperties(props);
+            }
+            // Deal with a race condition if
+            Map<String, Filter> updatedFilters;
+            synchronized (lock) {
+                updatedFilters = topicsToFilters;
+            }
+            if(!updatedFilters.equals(filters)) {
+                updateReg(updatedFilters);
+            }
+        }
+    }
+
+    void removeLocalInterest(Long id) {
+
+        Map<String, Filter> updatedFilters; 
+        synchronized(lock) {
+            if(servicesToInterests.remove(id) == null) {
+                return;
+            }
+            topicsToFilters = getUpdatedFilters();
+            updatedFilters = topicsToFilters;
+        }
+        
+        updateReg(updatedFilters);
+    }
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
new file mode 100644
index 0000000..83b80dd
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.function.Function.identity;
+import static org.apache.aries.component.dsl.OSGi.all;
+import static org.apache.aries.component.dsl.OSGi.bundleContext;
+import static org.apache.aries.component.dsl.OSGi.coalesce;
+import static org.apache.aries.component.dsl.OSGi.configuration;
+import static org.apache.aries.component.dsl.OSGi.just;
+import static org.apache.aries.component.dsl.OSGi.once;
+import static org.apache.aries.component.dsl.OSGi.register;
+import static org.apache.aries.component.dsl.OSGi.service;
+import static org.apache.aries.component.dsl.OSGi.serviceReferences;
+import static org.apache.aries.typedevent.remote.spi.LocalEventConsumerManager.ARIES_LOCAL_EVENT_PROXY;
+import static org.osgi.framework.Constants.BUNDLE_ACTIVATOR;
+import static org.osgi.framework.Constants.SERVICE_ID;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.lang.reflect.ParameterizedType;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.aries.component.dsl.OSGi;
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.typedevent.remote.api.RemoteEventMonitor;
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.apache.aries.typedevent.remote.spi.RemoteEventMonitorImpl;
+import org.osgi.annotation.bundle.Header;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventConstants;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.service.typedevent.UntypedEventHandler;
+import org.osgi.service.typedevent.monitor.TypedEventMonitor;
+import org.osgi.util.converter.TypeReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Header(name = BUNDLE_ACTIVATOR, value = "${@class}")
+public class RemoteServiceEventsActivator implements BundleActivator {
+
+    private static final Logger _log = LoggerFactory.getLogger(RemoteServiceEventsActivator.class);
+
+    OSGiResult eventBus;
+
+    @Override
+    public void start(BundleContext bundleContext) throws Exception {
+        if (_log.isDebugEnabled()) {
+            _log.debug("Aries Remote Typed Events (Remote Services) Starting");
+        }
+
+        eventBus = coalesce(configuration("org.apache.aries.typedevent.remote.remoteservices"), just(Hashtable::new))
+                .map(this::toConfigProps).flatMap(configuration -> createProgram(configuration)).run(bundleContext);
+
+        if (_log.isDebugEnabled()) {
+            _log.debug("Aries Typed Event Bus Started");
+        }
+    }
+
+    private OSGi<?> createProgram(Map<String, ?> configuration) {
+
+        OSGi<Object> monitor = service(once(serviceReferences(TypedEventMonitor.class)))
+                .map(RemoteEventMonitorImpl::new)
+                .flatMap(remi -> register(RemoteEventMonitor.class, remi, new HashMap<>()));
+
+        OSGi<Object> remote = bundleContext().flatMap(ctx -> service(once(serviceReferences(TypedEventBus.class)))
+                .map(RemoteEventBusImpl::new).effects(rebi -> rebi.init(ctx), rebi -> rebi.destroy())
+                .flatMap(rebi -> all(
+                        just(new UntypedEventTracker(ctx, rebi)).map(ServiceTracker.class::cast)
+                                .effects(st -> st.open(), st -> st.close()),
+                        just(new TypedEventTracker(ctx, rebi)).map(ServiceTracker.class::cast).effects(st -> st.open(),
+                                st -> st.close()))));
+
+        OSGi<Object> local = bundleContext()
+                .flatMap(ctx -> just(new LocalEventBusForwarder()).effects(lebf -> lebf.start(ctx), lebf -> lebf.stop())
+                        .flatMap(lebf -> serviceReferences(RemoteEventBus.class, "(service.imported=true)", csr -> {
+                            lebf.updatedRemoteEventBus(getServiceProps(csr.getServiceReference()));
+                            return false;
+                        }).flatMap(csr -> service(csr).effects(
+                                reb -> lebf.addRemoteEventBus(reb, getServiceProps(csr.getServiceReference())),
+                                reb -> lebf.removeRemoteEventBus(reb, getServiceProps(csr.getServiceReference()))))));
+
+        return all(monitor, remote, local);
+    }
+
+    private Map<String, Object> toConfigProps(Dictionary<String, ?> config) {
+        Enumeration<String> keys = config.keys();
+        Map<String, Object> map = new HashMap<>();
+        while (keys.hasMoreElements()) {
+            String key = keys.nextElement();
+            map.put(key, config.get(key));
+        }
+        return map;
+    }
+
+    private Map<String, Object> getServiceProps(ServiceReference<?> ref) {
+        return Arrays.stream(ref.getPropertyKeys()).collect(Collectors.toMap(identity(), ref::getProperty));
+    }
+
+    @Override
+    public void stop(BundleContext context) throws Exception {
+        if (_log.isDebugEnabled()) {
+            _log.debug("Aries Typed Event Bus Stopping");
+        }
+
+        eventBus.close();
+
+        if (_log.isDebugEnabled()) {
+            _log.debug("Aries Typed Event Bus Stopped");
+        }
+    }
+
+    private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() {
+    };
+
+    private static Long getServiceId(ServiceReference<?> ref) {
+        return standardConverter().convert(ref.getProperty(SERVICE_ID)).to(Long.class);
+    }
+
+    private static List<String> getTopics(ServiceReference<?> ref) {
+        return standardConverter().convert(ref.getProperty(TYPED_EVENT_TOPICS)).to(LIST_OF_STRINGS);
+    }
+
+    private static Filter getFilter(ServiceReference<?> ref) throws InvalidSyntaxException {
+        String filter = standardConverter().convert(ref.getProperty(TYPED_EVENT_FILTER)).to(String.class);
+        if (filter == null || "".equals(filter)) {
+            return null;
+        } else {
+            return FrameworkUtil.createFilter(filter);
+        }
+    }
+
+    private static class UntypedEventTracker extends ServiceTracker<UntypedEventHandler, Object> {
+
+        private final RemoteEventBusImpl impl;
+
+        public UntypedEventTracker(BundleContext context, RemoteEventBusImpl impl) {
+            super(context, UntypedEventHandler.class, null);
+            this.impl = impl;
+        }
+
+        @Override
+        public Object addingService(ServiceReference<UntypedEventHandler> reference) {
+            
+            if(TRUE.equals(reference.getProperty(ARIES_LOCAL_EVENT_PROXY))) {
+                // Ignore remote interest proxies
+                return null;
+            }
+            
+            Filter filter;
+            try {
+                filter = getFilter(reference);
+            } catch (InvalidSyntaxException e) {
+                // TODO Auto-generated catch block
+                return reference;
+            }
+            impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+            return reference;
+        }
+
+        @Override
+        public void modifiedService(ServiceReference<UntypedEventHandler> reference, Object service) {
+            Filter filter;
+            try {
+                filter = getFilter(reference);
+            } catch (InvalidSyntaxException e) {
+                // TODO Auto-generated catch block
+                impl.removeLocalInterest(getServiceId(reference));
+                return;
+            }
+            impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+        }
+
+        @Override
+        public void removedService(ServiceReference<UntypedEventHandler> reference, Object service) {
+            impl.removeLocalInterest(getServiceId(reference));
+        }
+    };
+
+    @SuppressWarnings("rawtypes")
+    private static class TypedEventTracker extends ServiceTracker<TypedEventHandler, TypedEventHandler> {
+
+        private final RemoteEventBusImpl impl;
+
+        public TypedEventTracker(BundleContext context, RemoteEventBusImpl impl) {
+            super(context, TypedEventHandler.class, null);
+            this.impl = impl;
+        }
+
+        @Override
+        public TypedEventHandler addingService(ServiceReference<TypedEventHandler> reference) {
+            TypedEventHandler toReturn = context.getService(reference);
+            Filter filter;
+            try {
+                filter = getFilter(reference);
+            } catch (InvalidSyntaxException e) {
+                // TODO Auto-generated catch block
+                return toReturn;
+            }
+            List<String> topics = findTopics(reference, toReturn);
+            if (!topics.isEmpty()) {
+                impl.updateLocalInterest(getServiceId(reference), topics, filter);
+            }
+            return toReturn;
+        }
+
+        private List<String> findTopics(ServiceReference<TypedEventHandler> reference, TypedEventHandler service) {
+            List<String> topics = getTopics(reference);
+            if (topics.isEmpty()) {
+                Object type = reference.getProperty(TypedEventConstants.TYPED_EVENT_TYPE);
+                if (type != null) {
+                    topics = Collections.singletonList(String.valueOf(type).replace(".", "/"));
+                } else {
+                    Class<?> clazz = discoverTypeForTypedHandler(service);
+                    if (clazz != null) {
+                        topics = Collections.singletonList(clazz.getName().replace(".", "/"));
+                    }
+                }
+            }
+            return topics;
+        }
+
+        /**
+         * Extensively copied from the Core Event Bus - is there a better way to share
+         * this?
+         * 
+         * @param handler
+         * @param properties
+         * @return
+         */
+        private Class<?> discoverTypeForTypedHandler(TypedEventHandler<?> handler) {
+            Class<?> clazz = null;
+            Class<?> toCheck = handler.getClass();
+            while (clazz == null) {
+                clazz = findDirectlyImplemented(toCheck);
+
+                if (clazz != null) {
+                    break;
+                }
+
+                clazz = processInterfaceHierarchyForClass(toCheck);
+
+                if (clazz != null) {
+                    break;
+                }
+
+                toCheck = toCheck.getSuperclass();
+            }
+
+            return clazz;
+        }
+
+        private Class<?> processInterfaceHierarchyForClass(Class<?> toCheck) {
+            Class<?> clazz = null;
+            for (Class<?> iface : toCheck.getInterfaces()) {
+                clazz = findDirectlyImplemented(iface);
+
+                if (clazz != null) {
+                    break;
+                }
+
+                clazz = processInterfaceHierarchyForClass(iface);
+
+                if (clazz != null) {
+                    break;
+                }
+            }
+            return clazz;
+        }
+
+        private Class<?> findDirectlyImplemented(Class<?> toCheck) {
+            return Arrays.stream(toCheck.getGenericInterfaces()).filter(ParameterizedType.class::isInstance)
+                    .map(ParameterizedType.class::cast).filter(t -> TypedEventHandler.class.equals(t.getRawType()))
+                    .map(t -> t.getActualTypeArguments()[0]).findFirst().map(Class.class::cast).orElse(null);
+        }
+
+        @Override
+        public void modifiedService(ServiceReference<TypedEventHandler> reference, TypedEventHandler service) {
+            Filter filter;
+            try {
+                filter = getFilter(reference);
+            } catch (InvalidSyntaxException e) {
+                // TODO Auto-generated catch block
+                impl.removeLocalInterest(getServiceId(reference));
+                return;
+            }
+
+            List<String> topics = findTopics(reference, service);
+            if (topics.isEmpty()) {
+                impl.removeLocalInterest(getServiceId(reference));
+            } else {
+                impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+            }
+        }
+
+        @Override
+        public void removedService(ServiceReference<TypedEventHandler> reference, TypedEventHandler service) {
+            impl.removeLocalInterest(getServiceId(reference));
+        }
+    };
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java
new file mode 100644
index 0000000..2c0336e
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.spi;
+
+import java.util.Map;
+
+import org.osgi.annotation.versioning.ProviderType;
+
+/**
+ * This interface should not be used by typical users of the
+ * Typed Event specification. It is intended to be a bridge
+ * between different mechanisms for broadcasting remote events
+ */
+
+@ProviderType
+public interface RemoteEventBus {
+    
+    /**
+     * This service property provides a String+ containing <topic>=<filter> 
+     * entries indicating the events that the remote nodes are interested in.
+     */
+    public static final String REMOTE_EVENT_FILTERS = "remote.event.filters";
+    
+    /**   
+     * Called to notify this instance of an event from a remote framework
+     */
+    public void notify(String topic, Map<String, Object> eventData);
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java
new file mode 100644
index 0000000..ed6cdcc
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@org.osgi.annotation.bundle.Export
+@org.osgi.annotation.versioning.Version("0.0.1")
+package org.apache.aries.typedevent.remote.remoteservices.spi;
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/common/TestEvent.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/common/TestEvent.java
new file mode 100644
index 0000000..3d6dc4e
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/common/TestEvent.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.common;
+
+public class TestEvent {
+    public String message;
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
new file mode 100644
index 0000000..9ddd8e1
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+import static java.util.Collections.emptyList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.osgi.framework.FrameworkUtil.createFilter;
+
+import java.util.Arrays;
+import java.util.Dictionary;
+
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.typedevent.TypedEventBus;
+
+@SuppressWarnings("unchecked")
+public class RemoteEventBusImplTest {
+    
+    @Mock
+    BundleContext context;
+    
+    @Mock
+    ServiceRegistration<RemoteEventBus> remoteReg;
+    @Mock
+    ServiceReference<RemoteEventBus> remoteRef;
+    
+    @Mock
+    TypedEventBus eventBusImpl;
+    
+    RemoteEventBusImpl remoteImpl;
+    
+    private AutoCloseable mocks;
+    
+    @BeforeEach
+    public void start() {
+        
+        mocks = MockitoAnnotations.openMocks(this);
+        
+        Mockito.when(context.registerService(Mockito.eq(RemoteEventBus.class), 
+                Mockito.any(RemoteEventBus.class), Mockito.any())).thenReturn(remoteReg);
+        Mockito.when(remoteReg.getReference()).thenReturn(remoteRef);
+        
+        remoteImpl = new RemoteEventBusImpl(eventBusImpl);
+    }
+
+    
+    @AfterEach
+    public void destroy() throws Exception {
+        remoteImpl.destroy();
+        mocks.close();
+    }
+    
+    @Test
+    public void testEmptyStart() {
+        remoteImpl.init(context);
+        
+        ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class); 
+        
+        Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl),
+                propsCaptor.capture());
+        
+        Dictionary<String, Object> props = propsCaptor.getValue();
+        assertNull(props);
+        
+        Mockito.verify(remoteReg).setProperties(propsCaptor.capture());
+        
+        props = propsCaptor.getValue();
+        
+        assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+        assertEquals(emptyList(), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+    }
+
+    @Test
+    public void testStartWithDetails() throws InvalidSyntaxException {
+        
+        remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
+        
+        remoteImpl.init(context);
+        
+        ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class); 
+        
+        Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl),
+                propsCaptor.capture());
+    
+        Dictionary<String, Object> props = propsCaptor.getValue();
+        assertNull(props);
+
+        Mockito.verify(remoteReg).setProperties(propsCaptor.capture());
+        
+        props = propsCaptor.getValue();
+        
+        assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+        assertEquals(Arrays.asList("FOO=(fizz=buzz)"), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+    }
+    
+    @Test
+    public void testLateRegisterOfListener() throws InvalidSyntaxException {
+        remoteImpl.init(context);
+        
+        ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class); 
+        
+        Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl),
+                propsCaptor.capture());
+        
+        Dictionary<String, Object> props = propsCaptor.getValue();
+        assertNull(props);
+        
+        Mockito.verify(remoteReg).setProperties(propsCaptor.capture());
+        
+        props = propsCaptor.getValue();
+        
+        assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+        assertEquals(emptyList(), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+        
+        // Add a listener to the remote
+        
+        remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
+        
+        Mockito.verify(remoteReg, Mockito.times(2)).setProperties(propsCaptor.capture());
+        
+        props = propsCaptor.getValue();
+        
+        assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+        assertEquals(Arrays.asList("FOO=(fizz=buzz)"), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+    }
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/AbstractIntegrationTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/AbstractIntegrationTest.java
new file mode 100644
index 0000000..dfcf586
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/AbstractIntegrationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.osgi;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.typedevent.remote.remoteservices.common.TestEvent;
+import org.junit.jupiter.api.AfterEach;
+import org.mockito.ArgumentMatcher;
+import org.osgi.framework.ServiceRegistration;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ * 
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+public abstract class AbstractIntegrationTest {
+    
+    protected static final String TEST_EVENT_TOPIC = TestEvent.class.getName().replace(".", "/");
+
+    
+    protected final List<ServiceRegistration<?>> regs = new ArrayList<ServiceRegistration<?>>();
+    
+    @AfterEach
+    public void tearDown() throws Exception {
+        regs.forEach(sr -> {
+            try {
+                sr.unregister();
+            } catch (Exception e) { }
+        });
+    }
+    
+    protected ArgumentMatcher<TestEvent> isTestEventWithMessage(String message) {
+        return new ArgumentMatcher<TestEvent>() {
+            
+            @Override
+            public boolean matches(TestEvent argument) {
+                return message.equals(argument.message);
+            }
+        };
+    }
+    
+    protected ArgumentMatcher<Map<String, Object>> isUntypedTestEventWithMessage(String message) {
+        return new ArgumentMatcher<Map<String, Object>>() {
+            
+            @Override
+            public boolean matches(Map<String, Object> argument) {
+                return argument != null && message.equals(argument.get("message"));
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
new file mode 100644
index 0000000..9788848
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.osgi;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
+
+import java.io.File;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.ServiceLoader;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.aries.typedevent.remote.remoteservices.common.TestEvent;
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleException;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceException;
+import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.launch.Framework;
+import org.osgi.framework.launch.FrameworkFactory;
+import org.osgi.framework.wiring.FrameworkWiring;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.UnhandledEventHandler;
+import org.osgi.service.typedevent.UntypedEventHandler;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ * 
+ * It can interact with the framework by starting or stopping bundles, getting
+ * or registering services, or in other ways, and then observing the result on
+ * the bundle(s) being tested.
+ */
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class RemoteEventBusIntegrationTest extends AbstractIntegrationTest {
+
+    private static final String REMOTE_BUS = RemoteEventBus.class.getName();
+    private static final String UNTYPED_HANDLER = UntypedEventHandler.class.getName();
+    private static final String UNHANDLED_HANDLER = UnhandledEventHandler.class.getName();
+    private Map<UUID, Framework> frameworks;
+    private Map<UUID, ServiceTracker<?,?>> remoteServicePublishers = new ConcurrentHashMap<>();
+    
+    @InjectBundleContext
+    BundleContext bundleContext;
+    
+    @InjectService
+    TypedEventBus bus;
+
+    @Mock
+    UntypedEventHandler untypedEventHandler;
+
+    @Mock
+    UnhandledEventHandler unhandledEventHandler;
+    
+    AutoCloseable mocks;
+    
+    @BeforeEach
+    public void setUpFrameworks() throws Exception {
+        mocks = MockitoAnnotations.openMocks(this);
+        
+        assertNotNull(bundleContext, "OSGi Bundle tests must be run inside an OSGi framework");
+
+        frameworks = createFrameworks(2);
+        frameworks.put(getMasterFrameworkUUID(), bundleContext.getBundle(0).adapt(Framework.class));
+        
+        for (Entry<UUID, Framework> entry : frameworks.entrySet()) {
+            Framework f = entry.getValue();
+            
+            BundleContext context = f.getBundleContext();
+            ServiceTracker<Object, Object> tracker = createCrossFrameworkPublisher(entry, context);
+            
+            remoteServicePublishers.put(entry.getKey(), tracker);
+        }
+    }
+
+    private ServiceTracker<Object, Object> createCrossFrameworkPublisher(Entry<UUID, Framework> entry,
+            BundleContext context) {
+        ServiceTracker<Object, Object> tracker = new ServiceTracker<Object, Object>(context, 
+                REMOTE_BUS, null) {
+            
+            Map<UUID, ServiceRegistration<?>> registered = new ConcurrentHashMap<>();
+
+                    @Override
+                    public Object addingService(ServiceReference<Object> reference) {
+                        
+                        if(reference.getBundle().getBundleId() == 0) {
+                            return null;
+                        }
+                        
+                        Object service = super.addingService(reference);
+
+                        for (Entry<UUID, Framework> e : frameworks.entrySet()) {
+                            UUID fwkId = entry.getKey();
+                            if(fwkId.equals(e.getKey())) {
+                                // Skip this framework as it's the same framework the service came from
+                                continue;
+                            }
+                            
+                            Framework fw = e.getValue();
+                            
+                            registered.put(fwkId, fw.getBundleContext().registerService(
+                                    REMOTE_BUS, new EventHandlerFactory(service, REMOTE_BUS), 
+                                    getRegistrationProps(reference)));
+                        }
+                        
+                        return service;
+                    }
+                    
+                    Dictionary<String, Object> getRegistrationProps(ServiceReference<?> ref) {
+                        Dictionary<String, Object> toReturn = new Hashtable<String, Object>();
+                        String[] props = ref.getPropertyKeys();
+                        for(String key : props) {
+                            toReturn.put(key, ref.getProperty(key));
+                        }
+                        
+                        toReturn.put("service.imported", true);
+                        return toReturn;
+                    }
+
+                    @Override
+                    public void modifiedService(ServiceReference<Object> reference, Object service) {
+                        for(ServiceRegistration<?> reg : registered.values()) {
+                            reg.setProperties(getRegistrationProps(reference));
+                        }
+                    }
+
+                    @Override
+                    public void removedService(ServiceReference<Object> reference, Object service) {
+                        for (ServiceRegistration<?> registration : registered.values()) {
+                            try {
+                                registration.unregister();
+                            } catch (Exception e) {
+                                // Never mind
+                            }
+                        }
+                        registered.clear();
+                        super.removedService(reference, service);
+                    }
+            
+        };
+        tracker.open(true);
+        return tracker;
+    }
+    
+    @AfterEach
+    public void shutdownFrameworks() throws Exception {
+        
+        frameworks.remove(getMasterFrameworkUUID());
+        
+        remoteServicePublishers.values().forEach(ServiceTracker::close);
+        remoteServicePublishers.clear();
+        
+        frameworks.values().forEach(f -> {
+            try {
+                f.stop();
+            } catch (BundleException be) {
+                // Never mind
+            }
+        });
+        
+        frameworks.clear();
+        
+        mocks.close();
+    }
+
+    private Map<UUID, Framework> createFrameworks(int size) throws BundleException {
+        
+        FrameworkFactory ff = ServiceLoader.load(FrameworkFactory.class, 
+                FrameworkFactory.class.getClassLoader()).iterator().next();
+        
+        List<String> locations = new ArrayList<>();
+        
+        for(Bundle b : bundleContext.getBundles()) {
+            if(
+                    b.getSymbolicName().equals("org.apache.aries.typedevent.bus") ||
+                    b.getSymbolicName().equals("org.apache.aries.typedevent.remote.api") ||
+                    b.getSymbolicName().equals("org.apache.aries.typedevent.remote.spi") ||
+                    b.getSymbolicName().equals("org.apache.aries.typedevent.remote.remoteservices") ||
+                    b.getSymbolicName().equals("org.apache.aries.component-dsl.component-dsl") ||
+                    b.getSymbolicName().equals("org.apache.felix.converter") ||
+                    b.getSymbolicName().equals("org.apache.felix.configadmin") ||
+                    b.getSymbolicName().equals("org.osgi.service.typedevent") ||
+                    b.getSymbolicName().equals("org.osgi.util.function") ||
+                    b.getSymbolicName().equals("org.osgi.util.promise") ||
+                    b.getSymbolicName().equals("org.osgi.util.pushstream") ||
+                    b.getSymbolicName().equals("slf4j.api") ||
+                    b.getSymbolicName().startsWith("ch.qos.logback")) {
+                locations.add(b.getLocation());
+            }
+        }
+        
+        Map<UUID, Framework> frameworks = new HashMap<UUID, Framework>();
+        for(int i = 1; i < size; i++) {
+            Map<String, String> fwConfig = new HashMap<>();
+            fwConfig.put(Constants.FRAMEWORK_STORAGE, new File(bundleContext.getDataFile(""), "Test-Cluster" + i).getAbsolutePath());
+            fwConfig.put(Constants.FRAMEWORK_STORAGE_CLEAN, Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
+            Framework f = ff.newFramework(fwConfig);
+            f.init();
+            for(String s : locations) {
+                f.getBundleContext().installBundle(s);
+            }
+            f.start();
+            f.adapt(FrameworkWiring.class).resolveBundles(Collections.emptySet());
+            for(Bundle b : f.getBundleContext().getBundles()) {
+                if(b.getHeaders().get(Constants.FRAGMENT_HOST) == null) {
+                    b.start();
+                }
+            }
+            frameworks.put(getUUID(f), f);
+        }
+        return frameworks;
+    }
+
+    private UUID getMasterFrameworkUUID() {
+        return UUID.fromString(bundleContext.getProperty(Constants.FRAMEWORK_UUID));
+    }
+    
+    private UUID getUUID(Framework f) {
+        return UUID.fromString(f.getBundleContext().getProperty(Constants.FRAMEWORK_UUID));
+    }
+
+
+    public static class EventHandlerFactory implements ServiceFactory<Object> {
+
+        private final Object delegate;
+        private final String typeToMimic;
+        
+        public EventHandlerFactory(Object delegate, String typeToMimic) {
+            this.delegate = delegate;
+            this.typeToMimic = typeToMimic;
+        }
+
+        @Override
+        public Object getService(Bundle bundle, ServiceRegistration<Object> registration) {
+            
+            try {
+                Class<?> loadClass = bundle.loadClass(typeToMimic);
+                
+                return Proxy.newProxyInstance(loadClass.getClassLoader(), new Class<?>[] {loadClass}, 
+                        (o,m,a) -> {
+                            
+                            if(m.getName().startsWith("notify") && m.getParameterTypes().length > 0) {
+                                return delegate.getClass().getMethod(m.getName(), m.getParameterTypes())
+                                        .invoke(delegate, a);
+                            } else {
+                                return m.invoke(delegate, a);
+                            }
+                        });
+                
+            } catch (Exception e) {
+                throw new ServiceException("failed to create service", e);
+            }
+        }
+
+        @Override
+        public void ungetService(Bundle bundle, ServiceRegistration<Object> registration, Object service) {
+            // TODO Auto-generated method stub
+            
+        }
+        
+    }
+    
+    @Test
+    public void testSendToRemoteFramework() throws InterruptedException {
+        
+        Dictionary<String, Object> props = new Hashtable<>();
+        regs.add(bundleContext.registerService(UNHANDLED_HANDLER, unhandledEventHandler, props));
+        
+        TestEvent event = new TestEvent();
+        event.message = "boo";
+        
+        bus.deliver(event);
+        
+        
+        verify(unhandledEventHandler, Mockito.after(100).times(1))
+            .notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+        
+        
+        BundleContext remoteContext = frameworks.values().stream()
+                .filter(fw -> !getUUID(fw).equals(getMasterFrameworkUUID()))
+                .flatMap(fw -> Arrays.stream(fw.getBundleContext().getBundles()))
+                .filter(b -> b.getSymbolicName().equals("org.osgi.service.typedevent"))
+                .map(Bundle::getBundleContext)
+                .findFirst()
+                .get();
+        
+        props = new Hashtable<>();
+        props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        props.put(TYPED_EVENT_FILTER, "(message=boo)");
+        
+        regs.add(remoteContext.registerService(UNTYPED_HANDLER, 
+                new EventHandlerFactory(untypedEventHandler, UNTYPED_HANDLER), props));
+        
+        
+        bus.deliver(event);
+        
+        verify(unhandledEventHandler, Mockito.after(1000).times(1))
+            .notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+        
+        verify(untypedEventHandler)
+            .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+    }
+    
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
new file mode 100644
index 0000000..704e7b4
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
@@ -0,0 +1,56 @@
+#    Licensed to the Apache Software Foundation (ASF) under one
+#    or more contributor license agreements.  See the NOTICE file
+#    distributed with this work for additional information
+#    regarding copyright ownership.  The ASF licenses this file
+#    to you under the Apache License, Version 2.0 (the
+#    "License"); you may not use this file except in compliance
+#    with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing,
+#    software distributed under the License is distributed on an
+#    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#    KIND, either express or implied.  See the License for the
+#    specific language governing permissions and limitations
+#    under the License.
+
+-tester: biz.aQute.tester.junit-platform
+
+-runfw: org.apache.felix.framework
+
+-runrequires: bnd.identity;id="org.apache.aries.typedevent.remote.remoteservices-tests",\
+  bnd.identity;id="junit-jupiter-engine",\
+  bnd.identity;id="junit-platform-launcher"
+ 
+-runsystempackages: sun.reflect
+
+-resolve.effective: active
+-runbundles: \
+    ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
+    ch.qos.logback.core;version='[1.2.3,1.2.4)',\
+    org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
+    org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
+    org.apache.felix.converter;version='[1.0.14,1.0.15)',\
+    org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
+    org.osgi.util.function;version='[1.1.0,1.1.1)',\
+    org.osgi.util.promise;version='[1.1.1,1.1.2)',\
+    org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
+    slf4j.api;version='[1.7.30,1.7.31)',\
+    junit-jupiter-api;version='[5.6.2,5.6.3)',\
+    junit-platform-commons;version='[1.6.2,1.6.3)',\
+    net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
+    net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
+    org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
+    org.objenesis;version='[3.1.0,3.1.1)',\
+    org.opentest4j;version='[1.2.0,1.2.1)',\
+    org.osgi.test.common;version='[0.9.0,0.9.1)',\
+    org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
+    junit-platform-engine;version='[1.6.2,1.6.3)',\
+    junit-platform-launcher;version='[1.6.2,1.6.3)',\
+    junit-jupiter-engine;version='[5.6.2,5.6.3)',\
+    org.apache.aries.typedevent.remote.api;version='[0.0.1,0.0.2)',\
+    org.apache.aries.typedevent.remote.remoteservices;version='[0.0.1,0.0.2)',\
+    org.apache.aries.typedevent.remote.remoteservices-tests;version='[0.0.1,0.0.2)',\
+    org.apache.aries.typedevent.remote.spi;version='[0.0.1,0.0.2)',\
+    org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)'
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml
new file mode 100644
index 0000000..a288407
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml
@@ -0,0 +1,50 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.aries.typedevent</groupId>
+        <artifactId>typedevent-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    
+    <groupId>org.apache.aries.typedevent.remote</groupId>
+    <artifactId>org.apache.aries.typedevent.remote.spi</artifactId>
+    
+    <dependencies>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.typedevent</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.annotation</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.annotation.bundle</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.converter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.aries.typedevent.remote</groupId>
+            <artifactId>org.apache.aries.typedevent.remote.api</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java
new file mode 100644
index 0000000..ff3a13e
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.spi;
+
+import static java.lang.Boolean.TRUE;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.typedevent.UntypedEventHandler;
+
+/**
+ * A simple helper class used to manage the registrations of {@link UntypedEventHandler}
+ * services in the local service registry, used to feed events into the remote events
+ * implementation.
+ * 
+ * Implementations should extend this type and override the {@link #notifyUntyped(String, Map)} method
+ * to receive events. The set of events received can be altered by calling {@link #updateTargets(Map)}.
+ */
+public abstract class LocalEventConsumerManager implements UntypedEventHandler {
+    
+    /**
+     * A service property indicating that the event handler is a proxy created for a remote node and so
+     * should not be considered as a local interest.
+     */
+    public static final String ARIES_LOCAL_EVENT_PROXY = "org.apache.aries.typedevent.remote.spi.local.proxy";
+
+    /**
+     * A filter to exclude local proxy interests from remote nodes
+     */
+    public static final String ARIES_LOCAL_EVENT_PROXY_EXCLUSION_FILTER = "(!(" + ARIES_LOCAL_EVENT_PROXY + "=true))";
+    
+    private final Object lock = new Object();
+    private final Map<String, ServiceRegistration<UntypedEventHandler>> listenerRegistrations = new HashMap<>();
+    private final Map<String, String> topicsToFilters = new HashMap<>();
+    private BundleContext ctx;
+    
+    /**
+     * Starts this manager, registering any necessary whiteboard services with the
+     * appropriate topic and filters;
+     * @param ctx
+     */
+    public final void start(BundleContext ctx) {
+        synchronized (lock) {
+            this.ctx = ctx;
+        }
+        updateServiceRegistrations();
+    }
+
+    /**
+     * Stops this manager, unregistering any whiteboard services
+     */
+    public final void stop() {
+        synchronized (lock) {
+            this.ctx = null;
+        }
+        Map<String, ServiceRegistration<UntypedEventHandler>> toUnregister;
+        synchronized (lock) {
+            toUnregister = new HashMap<>(listenerRegistrations);
+            listenerRegistrations.clear();
+        }
+        toUnregister.values().stream().forEach(this::safeUnregister);
+    }
+    
+    
+    private void updateServiceRegistrations() {
+        Map<String, String> possibleUpdates = new HashMap<String, String>();
+        Map<String, ServiceRegistration<UntypedEventHandler>> toUnregister;
+        synchronized (lock) {
+            possibleUpdates = new HashMap<>(topicsToFilters);
+            toUnregister = listenerRegistrations.entrySet().stream()
+                    .filter(e -> !topicsToFilters.containsKey(e.getKey()))
+                    .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+            listenerRegistrations.keySet().removeAll(toUnregister.keySet());
+        }
+        
+        toUnregister.values().stream().forEach(this::safeUnregister);
+        
+        for (Entry<String, String> entry : possibleUpdates.entrySet()) {
+            
+            String topic = entry.getKey();
+            String filter = entry.getValue();
+            
+            ServiceRegistration<UntypedEventHandler> reg;
+            BundleContext ctx;
+            synchronized (lock) {
+                reg = listenerRegistrations.get(topic);    
+                ctx = this.ctx;
+            }
+            
+            if(reg == null) {
+                if(ctx != null) {
+                    Dictionary<String, Object> props = new Hashtable<>();
+                    props.put(TYPED_EVENT_TOPICS, topic);
+                    props.put(ARIES_LOCAL_EVENT_PROXY, TRUE);
+                    if(filter != null && !filter.contentEquals("")) {
+                        props.put(TYPED_EVENT_FILTER, filter);
+                    }
+                    reg = ctx.registerService(UntypedEventHandler.class, this, props);
+                    
+                    synchronized (lock) {
+                        ServiceRegistration<UntypedEventHandler> oldReg = listenerRegistrations.putIfAbsent(topic, reg);
+                        if(oldReg == null) {
+                            reg = null;
+                        }
+                    }
+                    if(reg != null) {
+                        reg.unregister();
+                    }
+                }
+            } else if(ctx != null) {
+                
+                Dictionary<String, Object> props = new Hashtable<>();
+                props.put(TYPED_EVENT_TOPICS, topic);
+                props.put(ARIES_LOCAL_EVENT_PROXY, TRUE);
+                if(filter != null && !filter.contentEquals("")) {
+                    if(filter.equals(reg.getReference().getProperty(TYPED_EVENT_FILTER))) {
+                        // Filter unchanged - no need to update
+                        continue;
+                    }
+                    props.put(TYPED_EVENT_FILTER, filter);
+                } else if (reg.getReference().getProperty(TYPED_EVENT_FILTER) == null) {
+                    // Filter unchanged - no need to update
+                    continue;
+                }
+                reg.setProperties(props);
+            }
+        }
+        
+        boolean changed;
+        synchronized (lock) {
+            changed = !possibleUpdates.equals(topicsToFilters);
+        }
+        if(changed) {
+            updateServiceRegistrations();
+        }
+    }
+    
+    private void safeUnregister(ServiceRegistration<?> reg) {
+        try {
+            reg.unregister();
+        } catch (IllegalStateException ise) {
+            // Just ignore it
+        }
+    }
+    
+    /**
+     * Set the topic and filter targets for which whiteboard listeners
+     * should be registered
+     * @param updated - A Map of topic names (or globs) to filters
+     */
+    protected final void updateTargets(Map<String, String> updated) {
+        synchronized (lock) {
+            topicsToFilters.clear();
+            topicsToFilters.putAll(updated);
+        }
+        
+        updateServiceRegistrations();
+    }
+    
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java
new file mode 100644
index 0000000..61ef87f
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.spi;
+
+import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.REMOTE_EVENT_MARKER;
+
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.aries.typedevent.remote.api.FilterDTO;
+import org.apache.aries.typedevent.remote.api.RemoteEventMonitor;
+import org.apache.aries.typedevent.remote.api.RemoteMonitorEvent;
+import org.apache.aries.typedevent.remote.api.RemoteMonitorEvent.PublishType;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.service.typedevent.monitor.MonitorEvent;
+import org.osgi.service.typedevent.monitor.TypedEventMonitor;
+import org.osgi.util.converter.Converters;
+import org.osgi.util.function.Predicate;
+import org.osgi.util.pushstream.PushStream;
+
+public class RemoteEventMonitorImpl implements RemoteEventMonitor {
+
+    private final TypedEventMonitor monitor;
+
+    public RemoteEventMonitorImpl(TypedEventMonitor monitor) {
+
+        this.monitor = monitor;
+    }
+
+    private static RemoteMonitorEvent toRemoteEvent(MonitorEvent event) {
+
+        Object remoteMarker = event.eventData.get(REMOTE_EVENT_MARKER);
+        
+        RemoteMonitorEvent me = Converters.standardConverter().convert(event).sourceAsDTO().targetAsDTO().to(RemoteMonitorEvent.class);
+        me.publishType = Boolean.valueOf(String.valueOf(remoteMarker)) ? PublishType.REMOTE : PublishType.LOCAL;
+
+        return me;
+    }
+
+    @Override
+    public PushStream<RemoteMonitorEvent> monitorEvents(FilterDTO... filters) {
+        return monitorEvents(0, filters);
+    }
+
+    @Override
+    public PushStream<RemoteMonitorEvent> monitorEvents(int history, FilterDTO...filters) {
+        return monitor.monitorEvents(history)
+                .map(RemoteEventMonitorImpl::toRemoteEvent)
+                .filter(createFilter(filters));
+    }
+
+    @Override
+    public PushStream<RemoteMonitorEvent> monitorEvents(Instant history, FilterDTO...filters) {
+        return monitor.monitorEvents(history)
+                .map(RemoteEventMonitorImpl::toRemoteEvent)
+                .filter(createFilter(filters));
+    }
+
+    private class FilterPair {
+        Filter ldap;
+        Pattern regex;
+
+        FilterPair(FilterDTO filter) {
+            if (filter.ldapExpression != null && !filter.ldapExpression.isEmpty()) {
+                try {
+                    ldap = FrameworkUtil.createFilter(filter.ldapExpression);
+                } catch (InvalidSyntaxException e) {
+                    throw new IllegalArgumentException(e);
+                }
+            }
+
+            if (filter.regularExpression != null && !filter.regularExpression.isEmpty()) {
+                regex = Pattern.compile(filter.regularExpression);
+            }
+        }
+    }
+
+    private Predicate<RemoteMonitorEvent> createFilter(FilterDTO... filters) {
+        List<FilterPair> filterPairs = Arrays.asList(filters).stream()
+                .map(FilterPair::new).collect(Collectors.toList());
+
+        if (filterPairs.isEmpty()) {
+            return x -> true;
+        }
+
+        return event -> {
+            // We use a TreeMap to ensure predictable ordering of keys
+            // This is important for the regex matching contract.
+
+            SortedMap<String, Object> toFilter = new TreeMap<>();
+
+            // Using a collector blew up with null values, even though they are
+            // supported by the TreeMap
+            event.eventData.entrySet().stream()
+                    .flatMap(e -> flatten("", e))
+                    .forEach(e -> toFilter.put(e.getKey(), e.getValue()));
+
+            toFilter.put("-topic", event.topic);
+            toFilter.put("-publishType", event.publishType);
+
+            StringBuilder eventText = new StringBuilder();
+
+            if (filterPairs.stream().anyMatch(p -> p.regex != null)) {
+                toFilter.forEach((k, v) -> {
+                    eventText.append(k).append(':').append(v).append(',');
+                });
+            }
+
+            // If a FilterDTO contains both LDAP and regular expressions, then both must match.
+            return filterPairs.stream().anyMatch(p ->
+                    (p.ldap == null || p.ldap.matches(toFilter)) &&
+                    (p.regex == null || p.regex.matcher(eventText).find())
+            );
+        };
+    }
+
+    private Stream<Entry<String, Object>> flatten(String parentScope,
+            Entry<String, Object> entry) {
+
+        if (entry.getValue() instanceof Map) {
+
+            String keyPrefix = parentScope + entry.getKey() + ".";
+
+            @SuppressWarnings("unchecked")
+            Map<String, Object> subMap = (Map<String, Object>) entry.getValue();
+
+            // Recursively flatten maps that are inside our map
+            return subMap.entrySet().stream()
+                .flatMap(e -> flatten(keyPrefix, e));
+        } else if(parentScope.isEmpty()) {
+            // Fast path for top-level entries
+            return Stream.of(entry);
+        } else {
+            // Map the key of a nested entry into x.y.z
+            return Stream.of(new AbstractMap.SimpleEntry<>(
+                    parentScope + entry.getKey(), entry.getValue()));
+        }
+
+    }
+
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java
new file mode 100644
index 0000000..a08926c
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@org.osgi.annotation.bundle.Export
+@org.osgi.annotation.versioning.Version("0.0.1")
+package org.apache.aries.typedevent.remote.spi;
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/pom.xml b/org.apache.aries.typedevent.remote/pom.xml
new file mode 100644
index 0000000..79ff373
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/pom.xml
@@ -0,0 +1,21 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.aries.typedevent</groupId>
+        <artifactId>typedevent-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    
+    <groupId>org.apache.aries.typedevent.remote</groupId>
+    <artifactId>org.apache.aries.typedevent.remote</artifactId>
+    <packaging>pom</packaging>
+    
+    
+    <modules>
+      <module>org.apache.aries.typedevent.remote.api</module>
+      <module>org.apache.aries.typedevent.remote.spi</module>
+      <module>org.apache.aries.typedevent.remote.remoteservices</module>
+    </modules>
+</project>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index dfb1c98..7e89a7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,18 @@
                 <enabled>false</enabled>
             </releases>
         </repository>
+        <repository>
+            <id>Sonatype-snapshots</id>
+            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+            <snapshots>
+                <enabled>true</enabled>
+                <updatePolicy>daily</updatePolicy>
+                <checksumPolicy>ignore</checksumPolicy>
+            </snapshots>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+        </repository>
     </repositories>
 
     <properties>
@@ -322,5 +334,6 @@
     <modules>
         <module>typedevent-test-bom</module>
         <module>org.apache.aries.typedevent.bus</module>
+        <module>org.apache.aries.typedevent.remote</module>
     </modules>
 </project>


[aries-typedevent] 01/02: Correct the JavaDoc in unit tests

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git

commit c9f16c14604b85f0aa2c18f55b72169ff1471ee6
Author: Tim Ward <ti...@apache.org>
AuthorDate: Tue Sep 29 10:05:44 2020 +0100

    Correct the JavaDoc in unit tests
---
 .../org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
index 7b3996b..6667969 100644
--- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
@@ -246,7 +246,7 @@ public class TypedEventBusImplTest {
     }
 
     /**
-     * Tests that events are delivered to Smart Behaviours based on type
+     * Tests that events are delivered to Event Handlers based on type
      * 
      * @throws InterruptedException
      */