You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aries.apache.org by ti...@apache.org on 2020/10/01 14:26:08 UTC

[aries-typedevent] branch main updated: Allow the remoting implementation to configure which listener interests are made available remotely

This is an automated email from the ASF dual-hosted git repository.

timothyjward pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/aries-typedevent.git


The following commit(s) were added to refs/heads/main by this push:
     new 413c2e8  Allow the remoting implementation to configure which listener interests are made available remotely
413c2e8 is described below

commit 413c2e8be1b140cc3f9869ca168f190da6d64258
Author: Tim Ward <ti...@apache.org>
AuthorDate: Thu Oct 1 15:10:27 2020 +0100

    Allow the remoting implementation to configure which listener interests are made available remotely
---
 .../remote/remoteservices/impl/Config.java         | 13 +++++
 .../remoteservices/impl/RemoteEventBusImpl.java    | 59 +++++++++++++++++++---
 .../impl/RemoteServiceEventsActivator.java         | 13 ++---
 .../impl/RemoteEventBusImplTest.java               | 37 ++++++++++++--
 .../osgi/RemoteEventBusIntegrationTest.java        | 29 ++++++++++-
 5 files changed, 134 insertions(+), 17 deletions(-)

diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/Config.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/Config.java
new file mode 100644
index 0000000..b1b5342
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/Config.java
@@ -0,0 +1,13 @@
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+public @interface Config {
+
+    public static enum Selector {
+        ALL, WITH_FILTER, WITH_PROPERTY, CUSTOM;
+    }
+    
+    public Selector listener_selection() default Selector.WITH_PROPERTY;
+    
+    public String listener_selection_custom_filter();
+    
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
index 67f63e4..48ce6bf 100644
--- a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
@@ -19,13 +19,16 @@ package org.apache.aries.typedevent.remote.remoteservices.impl;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
+import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.RECEIVE_REMOTE_EVENTS;
 import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE;
+import static org.osgi.util.converter.Converters.standardConverter;
 
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.aries.typedevent.remote.api.RemoteEventConstants;
@@ -57,9 +60,16 @@ public class RemoteEventBusImpl implements RemoteEventBus {
     private final Map<Long, Map<String, Filter>> servicesToInterests = new HashMap<>();
     
     private final Object lock = new Object();
-
-    public RemoteEventBusImpl(TypedEventBus eventBus) {
+    
+    private final Config configuration;
+    
+    public RemoteEventBusImpl(TypedEventBus eventBus, Map<String, ?> config) {
         this.eventBus = eventBus;
+
+        Map<String, Object> configWithDefaults = new HashMap<String, Object>(config);
+        configWithDefaults.putIfAbsent("listener.selection", Config.Selector.WITH_PROPERTY);
+        
+        this.configuration = standardConverter().convert(configWithDefaults).to(Config.class);
     }
     
     public void init(BundleContext ctx) {
@@ -119,18 +129,53 @@ public class RemoteEventBusImpl implements RemoteEventBus {
      * @param topics
      * @param filter
      */
-    void updateLocalInterest(Long id, List<String> topics, Filter filter) {
-
-        boolean doUpdate = false;
+    void updateLocalInterest(Long id, List<String> topics, Filter filter, Map<String, ?> serviceProps) {
 
-        Map<String, Filter> newData = topics.stream()
+        Map<String, Filter> newData;
+        Supplier<Map<String, Filter>> fromTopics = () -> topics.stream()
                 .collect(toMap(identity(), x -> filter, (a,b) -> a));
         
+        switch(configuration.listener_selection()) {
+            case ALL:
+                newData = fromTopics.get();
+                break;
+            case CUSTOM:
+                String listenerFilterString = configuration.listener_selection_custom_filter();
+                try {
+                    Filter listenerFilter = FrameworkUtil.createFilter(listenerFilterString);
+                    
+                    if(listenerFilter.matches(serviceProps)) {
+                        newData = fromTopics.get();
+                        break;
+                    }
+                } catch (InvalidSyntaxException ise) {
+                    //TODO log that this is ignored;
+                }
+                newData = new HashMap<>();
+                break;
+            case WITH_FILTER:
+                newData = filter == null ? new HashMap<>() : fromTopics.get();
+                break;
+            case WITH_PROPERTY:
+                boolean hasProperty = Boolean.valueOf(String.valueOf(serviceProps.get(RECEIVE_REMOTE_EVENTS)));
+                newData = hasProperty ? fromTopics.get() : new HashMap<>();
+                break;
+            default:
+                newData = new HashMap<>();
+                break;
+        
+        }
+        
+        boolean doUpdate;
         Map<String, Filter> updatedFilters;
         synchronized(lock) {
-            doUpdate = true;
             servicesToInterests.put(id, newData);
+            
+            Map<String, Filter> tmpFilters = topicsToFilters;
             topicsToFilters = getUpdatedFilters();
+            
+            doUpdate = !tmpFilters.equals(topicsToFilters);
+            
             updatedFilters = topicsToFilters;
         }
         
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
index 83b80dd..130d839 100644
--- a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
@@ -95,7 +95,8 @@ public class RemoteServiceEventsActivator implements BundleActivator {
                 .flatMap(remi -> register(RemoteEventMonitor.class, remi, new HashMap<>()));
 
         OSGi<Object> remote = bundleContext().flatMap(ctx -> service(once(serviceReferences(TypedEventBus.class)))
-                .map(RemoteEventBusImpl::new).effects(rebi -> rebi.init(ctx), rebi -> rebi.destroy())
+                .map(teb -> new RemoteEventBusImpl(teb, configuration))
+                .effects(rebi -> rebi.init(ctx), rebi -> rebi.destroy())
                 .flatMap(rebi -> all(
                         just(new UntypedEventTracker(ctx, rebi)).map(ServiceTracker.class::cast)
                                 .effects(st -> st.open(), st -> st.close()),
@@ -124,7 +125,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
         return map;
     }
 
-    private Map<String, Object> getServiceProps(ServiceReference<?> ref) {
+    private static Map<String, Object> getServiceProps(ServiceReference<?> ref) {
         return Arrays.stream(ref.getPropertyKeys()).collect(Collectors.toMap(identity(), ref::getProperty));
     }
 
@@ -185,7 +186,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
                 // TODO Auto-generated catch block
                 return reference;
             }
-            impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+            impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter, getServiceProps(reference));
             return reference;
         }
 
@@ -199,7 +200,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
                 impl.removeLocalInterest(getServiceId(reference));
                 return;
             }
-            impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+            impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter, getServiceProps(reference));
         }
 
         @Override
@@ -230,7 +231,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
             }
             List<String> topics = findTopics(reference, toReturn);
             if (!topics.isEmpty()) {
-                impl.updateLocalInterest(getServiceId(reference), topics, filter);
+                impl.updateLocalInterest(getServiceId(reference), topics, filter, getServiceProps(reference));
             }
             return toReturn;
         }
@@ -320,7 +321,7 @@ public class RemoteServiceEventsActivator implements BundleActivator {
             if (topics.isEmpty()) {
                 impl.removeLocalInterest(getServiceId(reference));
             } else {
-                impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+                impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter, getServiceProps(reference));
             }
         }
 
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
index 9ddd8e1..abe7b7d 100644
--- a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
@@ -16,13 +16,18 @@
  */
 package org.apache.aries.typedevent.remote.remoteservices.impl;
 
+import static java.lang.Boolean.TRUE;
 import static java.util.Collections.emptyList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.singletonMap;
+import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.RECEIVE_REMOTE_EVENTS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.osgi.framework.FrameworkUtil.createFilter;
 
 import java.util.Arrays;
 import java.util.Dictionary;
+import java.util.HashMap;
 
 import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
 import org.junit.jupiter.api.AfterEach;
@@ -65,7 +70,7 @@ public class RemoteEventBusImplTest {
                 Mockito.any(RemoteEventBus.class), Mockito.any())).thenReturn(remoteReg);
         Mockito.when(remoteReg.getReference()).thenReturn(remoteRef);
         
-        remoteImpl = new RemoteEventBusImpl(eventBusImpl);
+        remoteImpl = new RemoteEventBusImpl(eventBusImpl, new HashMap<>());
     }
 
     
@@ -98,7 +103,8 @@ public class RemoteEventBusImplTest {
     @Test
     public void testStartWithDetails() throws InvalidSyntaxException {
         
-        remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
+        remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"), 
+                singletonMap(RECEIVE_REMOTE_EVENTS, TRUE));
         
         remoteImpl.init(context);
         
@@ -139,7 +145,8 @@ public class RemoteEventBusImplTest {
         
         // Add a listener to the remote
         
-        remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
+        remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"),
+                singletonMap(RECEIVE_REMOTE_EVENTS, TRUE));
         
         Mockito.verify(remoteReg, Mockito.times(2)).setProperties(propsCaptor.capture());
         
@@ -148,4 +155,28 @@ public class RemoteEventBusImplTest {
         assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
         assertEquals(Arrays.asList("FOO=(fizz=buzz)"), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
     }
+    
+    @Test
+    public void testStartWithNonRemoteListener() throws InvalidSyntaxException {
+        
+        remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"), 
+                emptyMap());
+        
+        remoteImpl.init(context);
+        
+        ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class); 
+        
+        Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl),
+                propsCaptor.capture());
+    
+        Dictionary<String, Object> props = propsCaptor.getValue();
+        assertNull(props);
+
+        Mockito.verify(remoteReg).setProperties(propsCaptor.capture());
+        
+        props = propsCaptor.getValue();
+        
+        assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+        assertEquals(emptyList(), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+    }
 }
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
index 9788848..c097a62 100644
--- a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.aries.typedevent.remote.remoteservices.osgi;
 
+import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.RECEIVE_REMOTE_EVENTS;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.eq;
@@ -91,7 +92,7 @@ public class RemoteEventBusIntegrationTest extends AbstractIntegrationTest {
     TypedEventBus bus;
 
     @Mock
-    UntypedEventHandler untypedEventHandler;
+    UntypedEventHandler untypedEventHandler, untypedEventHandler2;
 
     @Mock
     UnhandledEventHandler unhandledEventHandler;
@@ -330,19 +331,45 @@ public class RemoteEventBusIntegrationTest extends AbstractIntegrationTest {
         
         props = new Hashtable<>();
         props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        props.put(RECEIVE_REMOTE_EVENTS, true);
         props.put(TYPED_EVENT_FILTER, "(message=boo)");
         
         regs.add(remoteContext.registerService(UNTYPED_HANDLER, 
                 new EventHandlerFactory(untypedEventHandler, UNTYPED_HANDLER), props));
+
+        props = new Hashtable<>();
+        props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        props.put(RECEIVE_REMOTE_EVENTS, false);
+        props.put(TYPED_EVENT_FILTER, "(message=far)");
+        
+        regs.add(remoteContext.registerService(UNTYPED_HANDLER, 
+                new EventHandlerFactory(untypedEventHandler2, UNTYPED_HANDLER), props));
         
         
         bus.deliver(event);
         
         verify(unhandledEventHandler, Mockito.after(1000).times(1))
             .notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+
+        verify(untypedEventHandler2, Mockito.after(1000).never())
+            .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
         
         verify(untypedEventHandler)
             .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+        
+        event = new TestEvent();
+        event.message = "far";
+        
+        bus.deliver(event);
+        
+        verify(unhandledEventHandler, Mockito.after(100).times(1))
+        .notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("far")));
+
+        verify(untypedEventHandler2, Mockito.after(1000).never())
+        .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("far")));
+    
+        verify(untypedEventHandler, Mockito.after(1000).never())
+        .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("far")));
     }
     
 }
\ No newline at end of file