You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by ah...@apache.org on 2018/08/27 11:07:09 UTC

[isis] branch master updated: ISIS-1905 migrating axon from v2.x to 3.x

This is an automated email from the ASF dual-hosted git repository.

ahuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/isis.git


The following commit(s) were added to refs/heads/master by this push:
     new aa638f9  ISIS-1905 migrating axon from v2.x to 3.x
aa638f9 is described below

commit aa638f97e4c9d1a35611e981f366d21af731ee90
Author: Andi Huber <ah...@apache.org>
AuthorDate: Mon Aug 27 13:07:01 2018 +0200

    ISIS-1905 migrating axon from v2.x to 3.x
---
 core/plugins/eventbus-axon/pom.xml                 |  1 +
 .../plugins/eventbus/EventBusPluginForAxon.java    | 93 ++++++++++++++++------
 .../EventBusServiceDefaultUsingAxonSimpleTest.java |  6 +-
 core/pom.xml                                       |  2 -
 4 files changed, 71 insertions(+), 31 deletions(-)

diff --git a/core/plugins/eventbus-axon/pom.xml b/core/plugins/eventbus-axon/pom.xml
index b2d95c1..fb84f56 100644
--- a/core/plugins/eventbus-axon/pom.xml
+++ b/core/plugins/eventbus-axon/pom.xml
@@ -30,6 +30,7 @@
 	<properties>
 		<jar-plugin.automaticModuleName>org.apache.isis.plugins.eventbus-axon</jar-plugin.automaticModuleName>
 		<git-plugin.propertiesDir>org/apache/isis/plugins/eventbus-axon</git-plugin.propertiesDir>
+		<axon-core.version>3.3.5</axon-core.version>
 	</properties>
 
 	<build>
diff --git a/core/plugins/eventbus-axon/src/main/java/org/apache/isis/core/plugins/eventbus/EventBusPluginForAxon.java b/core/plugins/eventbus-axon/src/main/java/org/apache/isis/core/plugins/eventbus/EventBusPluginForAxon.java
index 2cfd6c9..99d3b65 100644
--- a/core/plugins/eventbus-axon/src/main/java/org/apache/isis/core/plugins/eventbus/EventBusPluginForAxon.java
+++ b/core/plugins/eventbus-axon/src/main/java/org/apache/isis/core/plugins/eventbus/EventBusPluginForAxon.java
@@ -16,19 +16,25 @@
  */
 package org.apache.isis.core.plugins.eventbus;
 
+import static java.util.Objects.requireNonNull;
+import static org.apache.isis.commons.internal.base._With.acceptIfPresent;
+
+import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 
-import org.apache.isis.applib.events.domain.AbstractDomainEvent;
-import org.apache.isis.core.plugins.eventbus.EventBusPlugin;
-import org.apache.isis.core.runtime.services.eventbus.EventBusImplementationAbstract;
-import org.axonframework.domain.EventMessage;
-import org.axonframework.domain.GenericEventMessage;
+import org.axonframework.common.Registration;
+import org.axonframework.eventhandling.AnnotationEventListenerAdapter;
 import org.axonframework.eventhandling.EventListenerProxy;
+import org.axonframework.eventhandling.EventMessage;
+import org.axonframework.eventhandling.GenericEventMessage;
 import org.axonframework.eventhandling.SimpleEventBus;
-import org.axonframework.eventhandling.annotation.AnnotationEventListenerAdapter;
+
+import org.apache.isis.applib.events.domain.AbstractDomainEvent;
+import org.apache.isis.commons.internal.base._NullSafe;
+import org.apache.isis.core.runtime.services.eventbus.EventBusImplementationAbstract;
+
 
 /**
  * A wrapper for an Axon {@link org.axonframework.eventhandling.SimpleEventBus},
@@ -42,22 +48,18 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
 
     @Override
     public void register(final Object domainService) {
-        simpleEventBus.subscribe(adapterFor(domainService));
+        final AxonEventListenerAdapter adapter = lookupOrCreateAdapterFor(domainService);
+        final Registration registrationHandle = simpleEventBus.subscribe(eventProcessorFor(adapter));
+        adapter.registration = registrationHandle;
     }
-
+    
     @Override
     public void unregister(final Object domainService) {
-        // Seems it's needed to be a no-op (See EventBusService).
-        // AxonSimpleEventBusAdapter.simpleEventBus.unsubscribe(AxonSimpleEventBusAdapter.adapterFor(domainService));
+        acceptIfPresent(lookupAdapterFor(domainService), adapter->{
+            acceptIfPresent(adapter.registration, Registration::cancel);
+        });
     }
 
-    /*
-     * Logic equivalent to Guava Event Bus.
-     *
-     * <p>
-     *     Despite that, event processing cannot be followed after an Exception is thrown.
-     * </p>
-     */
     @Override
     public void post(final Object event) {
         simpleEventBus.publish(GenericEventMessage.asEventMessage(event));
@@ -69,14 +71,19 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
             final Consumer<T> onEvent) {
 
         final AxonEventListener<T> eventListener = new AxonEventListener<T>(targetType, onEvent);
-        simpleEventBus.subscribe(eventListener.proxy());
+        final EventListenerProxy proxy = eventListener.proxy();
+        
+        final Registration registrationHandle = simpleEventBus.subscribe(eventProcessorFor(proxy));
+        eventListener.registration = registrationHandle;
+        
         return eventListener;
     }
 
     @Override
     public <T> void removeEventListener(EventBusPlugin.EventListener<T> eventListener) {
         if(eventListener instanceof AxonEventListener) {
-            simpleEventBus.unsubscribe(((AxonEventListener<T>)eventListener).proxy());
+            final AxonEventListener<T> listenerInstance = (AxonEventListener<T>) eventListener;
+            acceptIfPresent(listenerInstance.registration, Registration::cancel);
         }
     }
 
@@ -90,12 +97,40 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
             final Object payload = genericEventMessage.getPayload();
             return asDomainEventIfPossible(payload);
         }
-        // don't think this occurs with axon, but this is the original behaviour
+        // don't think this occurs with axon, but this is the original behavior
         // before the above change to detect GenericEventMessage
         return asDomainEventIfPossible(event);
     }
 
     // -- HELPER
+    
+    private Consumer<List<? extends EventMessage<?>>> eventProcessorFor(final EventListenerProxy proxy) {
+        return eventMessages->{
+            _NullSafe.stream(eventMessages)
+            .filter(proxy::canHandle)
+            .forEach(event->{
+                try {
+                    proxy.handle(event);
+                } catch (final Exception exception) {
+                    processException(exception, event);
+                }
+            });
+        };
+    }
+    
+    private Consumer<List<? extends EventMessage<?>>> eventProcessorFor(final AxonEventListenerAdapter adapter) {
+        return eventMessages->{
+            _NullSafe.stream(eventMessages)
+            .filter(adapter::canHandle)
+            .forEach(event->{
+                try {
+                    adapter.handle(event);
+                } catch (final Exception exception) {
+                    processException(exception, event);
+                }
+            });
+        };
+    }
 
     /**
      * Wraps a Consumer as EventBusImplementation.EventListener with the given targetType.
@@ -105,8 +140,10 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
     static class AxonEventListener<T> implements EventBusPlugin.EventListener<T> {
         private final Consumer<T> eventConsumer;
         private final EventListenerProxy proxy;
+        private Registration registration;
+        
         private AxonEventListener(final Class<T> targetType, final Consumer<T> eventConsumer) {
-            this.eventConsumer = Objects.requireNonNull(eventConsumer);
+            this.eventConsumer = requireNonNull(eventConsumer);
             this.proxy = new EventListenerProxy() {
                 @SuppressWarnings("unchecked")
                 @Override
@@ -135,8 +172,8 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
 
     }
 
-    private AxonEventListenerAdapter adapterFor(final Object domainService) {
-        AxonEventListenerAdapter annotationEventListenerAdapter = listenerAdapterByDomainService.get(domainService);
+    private AxonEventListenerAdapter lookupOrCreateAdapterFor(final Object domainService) {
+        AxonEventListenerAdapter annotationEventListenerAdapter = lookupAdapterFor(domainService);
         if (annotationEventListenerAdapter == null) {
             annotationEventListenerAdapter = new AxonEventListenerAdapter(domainService);
             listenerAdapterByDomainService.put(domainService, annotationEventListenerAdapter);
@@ -144,6 +181,10 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
         return annotationEventListenerAdapter;
     }
 
+    private AxonEventListenerAdapter lookupAdapterFor(final Object domainService) {
+        return listenerAdapterByDomainService.get(domainService);
+    }
+    
     private AbstractDomainEvent<?> asDomainEventIfPossible(final Object event) {
         if (event instanceof AbstractDomainEvent)
             return (AbstractDomainEvent<?>) event;
@@ -153,6 +194,8 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
 
     class AxonEventListenerAdapter extends AnnotationEventListenerAdapter {
 
+        private Registration registration;
+
         public AxonEventListenerAdapter(final Object annotatedEventListener) {
             super(annotatedEventListener);
         }
@@ -170,6 +213,4 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
         }
     }
 
-
-
 }
\ No newline at end of file
diff --git a/core/plugins/eventbus-axon/src/test/java/org/apache/isis/core/runtime/services/eventbus/EventBusServiceDefaultUsingAxonSimpleTest.java b/core/plugins/eventbus-axon/src/test/java/org/apache/isis/core/runtime/services/eventbus/EventBusServiceDefaultUsingAxonSimpleTest.java
index 0f03b53..0cd6726 100644
--- a/core/plugins/eventbus-axon/src/test/java/org/apache/isis/core/runtime/services/eventbus/EventBusServiceDefaultUsingAxonSimpleTest.java
+++ b/core/plugins/eventbus-axon/src/test/java/org/apache/isis/core/runtime/services/eventbus/EventBusServiceDefaultUsingAxonSimpleTest.java
@@ -62,7 +62,7 @@ public class EventBusServiceDefaultUsingAxonSimpleTest {
 
         public static class Subscriber1 {
         	Type1 obj;
-        	@org.axonframework.eventhandling.annotation.EventHandler
+        	@org.axonframework.eventhandling.EventHandler
             public void on1(Type1 obj) {
                 this.obj = obj;
             }
@@ -70,7 +70,7 @@ public class EventBusServiceDefaultUsingAxonSimpleTest {
         
         public static class Subscriber2 {
         	Type1 obj;
-        	@org.axonframework.eventhandling.annotation.EventHandler
+        	@org.axonframework.eventhandling.EventHandler
             public void on2(Type1 obj) {
                 this.obj = obj;
             }
@@ -78,7 +78,7 @@ public class EventBusServiceDefaultUsingAxonSimpleTest {
         
         public static class Subscriber3 {
         	Type3 obj;
-        	@org.axonframework.eventhandling.annotation.EventHandler
+        	@org.axonframework.eventhandling.EventHandler
             public void on3(Type3 obj) {
                 this.obj = obj;
             }
diff --git a/core/pom.xml b/core/pom.xml
index d831369..2b81498 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -142,8 +142,6 @@
         <commons-io.version>2.6</commons-io.version>
         <com-sun-mail.version>1.5.2</com-sun-mail.version>
 
-        <axon-core.version>3.3.5</axon-core.version>
-
         <jackson.version>2.8.0</jackson.version>
         <gson.version>2.7</gson.version>
         <swagger-core.version>1.5.9</swagger-core.version>