You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by ti...@apache.org on 2020/10/01 14:26:08 UTC
[aries-typedevent] branch main updated: Allow the remoting
implementation to configure which listener interests are made available
remotely
This is an automated email from the ASF dual-hosted git repository.
timothyjward pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git
The following commit(s) were added to refs/heads/main by this push:
new 413c2e8 Allow the remoting implementation to configure which listener interests are made available remotely
413c2e8 is described below
commit 413c2e8be1b140cc3f9869ca168f190da6d64258
Author: Tim Ward <ti...@apache.org>
AuthorDate: Thu Oct 1 15:10:27 2020 +0100
Allow the remoting implementation to configure which listener interests are made available remotely
---
.../remote/remoteservices/impl/Config.java | 13 +++++
.../remoteservices/impl/RemoteEventBusImpl.java | 59 +++++++++++++++++++---
.../impl/RemoteServiceEventsActivator.java | 13 ++---
.../impl/RemoteEventBusImplTest.java | 37 ++++++++++++--
.../osgi/RemoteEventBusIntegrationTest.java | 29 ++++++++++-
5 files changed, 134 insertions(+), 17 deletions(-)
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/Config.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/Config.java
new file mode 100644
index 0000000..b1b5342
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/Config.java
@@ -0,0 +1,13 @@
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+public @interface Config {
+
+ public static enum Selector {
+ ALL, WITH_FILTER, WITH_PROPERTY, CUSTOM;
+ }
+
+ public Selector listener_selection() default Selector.WITH_PROPERTY;
+
+ public String listener_selection_custom_filter();
+
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
index 67f63e4..48ce6bf 100644
--- a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
@@ -19,13 +19,16 @@ package org.apache.aries.typedevent.remote.remoteservices.impl;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
+import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.RECEIVE_REMOTE_EVENTS;
import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE;
+import static org.osgi.util.converter.Converters.standardConverter;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.aries.typedevent.remote.api.RemoteEventConstants;
@@ -57,9 +60,16 @@ public class RemoteEventBusImpl implements RemoteEventBus {
private final Map<Long, Map<String, Filter>> servicesToInterests = new HashMap<>();
private final Object lock = new Object();
-
- public RemoteEventBusImpl(TypedEventBus eventBus) {
+
+ private final Config configuration;
+
+ public RemoteEventBusImpl(TypedEventBus eventBus, Map<String, ?> config) {
this.eventBus = eventBus;
+
+ Map<String, Object> configWithDefaults = new HashMap<String, Object>(config);
+ configWithDefaults.putIfAbsent("listener.selection", Config.Selector.WITH_PROPERTY);
+
+ this.configuration = standardConverter().convert(configWithDefaults).to(Config.class);
}
public void init(BundleContext ctx) {
@@ -119,18 +129,53 @@ public class RemoteEventBusImpl implements RemoteEventBus {
* @param topics
* @param filter
*/
- void updateLocalInterest(Long id, List<String> topics, Filter filter) {
-
- boolean doUpdate = false;
+ void updateLocalInterest(Long id, List<String> topics, Filter filter, Map<String, ?> serviceProps) {
- Map<String, Filter> newData = topics.stream()
+ Map<String, Filter> newData;
+ Supplier<Map<String, Filter>> fromTopics = () -> topics.stream()
.collect(toMap(identity(), x -> filter, (a,b) -> a));
+ switch(configuration.listener_selection()) {
+ case ALL:
+ newData = fromTopics.get();
+ break;
+ case CUSTOM:
+ String listenerFilterString = configuration.listener_selection_custom_filter();
+ try {
+ Filter listenerFilter = FrameworkUtil.createFilter(listenerFilterString);
+
+ if(listenerFilter.matches(serviceProps)) {
+ newData = fromTopics.get();
+ break;
+ }
+ } catch (InvalidSyntaxException ise) {
+ //TODO log that this is ignored;
+ }
+ newData = new HashMap<>();
+ break;
+ case WITH_FILTER:
+ newData = filter == null ? new HashMap<>() : fromTopics.get();
+ break;
+ case WITH_PROPERTY:
+ boolean hasProperty = Boolean.valueOf(String.valueOf(serviceProps.get(RECEIVE_REMOTE_EVENTS)));
+ newData = hasProperty ? fromTopics.get() : new HashMap<>();
+ break;
+ default:
+ newData = new HashMap<>();
+ break;
+
+ }
+
+ boolean doUpdate;
Map<String, Filter> updatedFilters;
synchronized(lock) {
- doUpdate = true;
servicesToInterests.put(id, newData);
+
+ Map<String, Filter> tmpFilters = topicsToFilters;
topicsToFilters = getUpdatedFilters();
+
+ doUpdate = !tmpFilters.equals(topicsToFilters);
+
updatedFilters = topicsToFilters;
}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
index 83b80dd..130d839 100644
--- a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
@@ -95,7 +95,8 @@ public class RemoteServiceEventsActivator implements BundleActivator {
.flatMap(remi -> register(RemoteEventMonitor.class, remi, new HashMap<>()));
OSGi<Object> remote = bundleContext().flatMap(ctx -> service(once(serviceReferences(TypedEventBus.class)))
- .map(RemoteEventBusImpl::new).effects(rebi -> rebi.init(ctx), rebi -> rebi.destroy())
+ .map(teb -> new RemoteEventBusImpl(teb, configuration))
+ .effects(rebi -> rebi.init(ctx), rebi -> rebi.destroy())
.flatMap(rebi -> all(
just(new UntypedEventTracker(ctx, rebi)).map(ServiceTracker.class::cast)
.effects(st -> st.open(), st -> st.close()),
@@ -124,7 +125,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
return map;
}
- private Map<String, Object> getServiceProps(ServiceReference<?> ref) {
+ private static Map<String, Object> getServiceProps(ServiceReference<?> ref) {
return Arrays.stream(ref.getPropertyKeys()).collect(Collectors.toMap(identity(), ref::getProperty));
}
@@ -185,7 +186,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
// TODO Auto-generated catch block
return reference;
}
- impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+ impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter, getServiceProps(reference));
return reference;
}
@@ -199,7 +200,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
impl.removeLocalInterest(getServiceId(reference));
return;
}
- impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+ impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter, getServiceProps(reference));
}
@Override
@@ -230,7 +231,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
}
List<String> topics = findTopics(reference, toReturn);
if (!topics.isEmpty()) {
- impl.updateLocalInterest(getServiceId(reference), topics, filter);
+ impl.updateLocalInterest(getServiceId(reference), topics, filter, getServiceProps(reference));
}
return toReturn;
}
@@ -320,7 +321,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
if (topics.isEmpty()) {
impl.removeLocalInterest(getServiceId(reference));
} else {
- impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+ impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter, getServiceProps(reference));
}
}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
index 9ddd8e1..abe7b7d 100644
--- a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
@@ -16,13 +16,18 @@
*/
package org.apache.aries.typedevent.remote.remoteservices.impl;
+import static java.lang.Boolean.TRUE;
import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.RECEIVE_REMOTE_EVENTS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.osgi.framework.FrameworkUtil.createFilter;
import java.util.Arrays;
import java.util.Dictionary;
+import java.util.HashMap;
import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
import org.junit.jupiter.api.AfterEach;
@@ -65,7 +70,7 @@ public class RemoteEventBusImplTest {
Mockito.any(RemoteEventBus.class), Mockito.any())).thenReturn(remoteReg);
Mockito.when(remoteReg.getReference()).thenReturn(remoteRef);
- remoteImpl = new RemoteEventBusImpl(eventBusImpl);
+ remoteImpl = new RemoteEventBusImpl(eventBusImpl, new HashMap<>());
}
@@ -98,7 +103,8 @@ public class RemoteEventBusImplTest {
@Test
public void testStartWithDetails() throws InvalidSyntaxException {
- remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
+ remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"),
+ singletonMap(RECEIVE_REMOTE_EVENTS, TRUE));
remoteImpl.init(context);
@@ -139,7 +145,8 @@ public class RemoteEventBusImplTest {
// Add a listener to the remote
- remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
+ remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"),
+ singletonMap(RECEIVE_REMOTE_EVENTS, TRUE));
Mockito.verify(remoteReg, Mockito.times(2)).setProperties(propsCaptor.capture());
@@ -148,4 +155,28 @@ public class RemoteEventBusImplTest {
assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
assertEquals(Arrays.asList("FOO=(fizz=buzz)"), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
}
+
+ @Test
+ public void testStartWithNonRemoteListener() throws InvalidSyntaxException {
+
+ remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"),
+ emptyMap());
+
+ remoteImpl.init(context);
+
+ ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class);
+
+ Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl),
+ propsCaptor.capture());
+
+ Dictionary<String, Object> props = propsCaptor.getValue();
+ assertNull(props);
+
+ Mockito.verify(remoteReg).setProperties(propsCaptor.capture());
+
+ props = propsCaptor.getValue();
+
+ assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+ assertEquals(emptyList(), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+ }
}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
index 9788848..c097a62 100644
--- a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.aries.typedevent.remote.remoteservices.osgi;
+import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.RECEIVE_REMOTE_EVENTS;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
@@ -91,7 +92,7 @@ public class RemoteEventBusIntegrationTest extends AbstractIntegrationTest {
TypedEventBus bus;
@Mock
- UntypedEventHandler untypedEventHandler;
+ UntypedEventHandler untypedEventHandler, untypedEventHandler2;
@Mock
UnhandledEventHandler unhandledEventHandler;
@@ -330,19 +331,45 @@ public class RemoteEventBusIntegrationTest extends AbstractIntegrationTest {
props = new Hashtable<>();
props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ props.put(RECEIVE_REMOTE_EVENTS, true);
props.put(TYPED_EVENT_FILTER, "(message=boo)");
regs.add(remoteContext.registerService(UNTYPED_HANDLER,
new EventHandlerFactory(untypedEventHandler, UNTYPED_HANDLER), props));
+
+ props = new Hashtable<>();
+ props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ props.put(RECEIVE_REMOTE_EVENTS, false);
+ props.put(TYPED_EVENT_FILTER, "(message=far)");
+
+ regs.add(remoteContext.registerService(UNTYPED_HANDLER,
+ new EventHandlerFactory(untypedEventHandler2, UNTYPED_HANDLER), props));
bus.deliver(event);
verify(unhandledEventHandler, Mockito.after(1000).times(1))
.notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+
+ verify(untypedEventHandler2, Mockito.after(1000).never())
+ .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
verify(untypedEventHandler)
.notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+
+ event = new TestEvent();
+ event.message = "far";
+
+ bus.deliver(event);
+
+ verify(unhandledEventHandler, Mockito.after(100).times(1))
+ .notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("far")));
+
+ verify(untypedEventHandler2, Mockito.after(1000).never())
+ .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("far")));
+
+ verify(untypedEventHandler, Mockito.after(1000).never())
+ .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("far")));
}
}
\ No newline at end of file