You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@isis.apache.org by ah...@apache.org on 2018/08/27 11:07:09 UTC
[isis] branch master updated: ISIS-1905 migrating axon from v2.x to
3.x
This is an automated email from the ASF dual-hosted git repository.
ahuber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/isis.git
The following commit(s) were added to refs/heads/master by this push:
new aa638f9 ISIS-1905 migrating axon from v2.x to 3.x
aa638f9 is described below
commit aa638f97e4c9d1a35611e981f366d21af731ee90
Author: Andi Huber <ah...@apache.org>
AuthorDate: Mon Aug 27 13:07:01 2018 +0200
ISIS-1905 migrating axon from v2.x to 3.x
---
core/plugins/eventbus-axon/pom.xml | 1 +
.../plugins/eventbus/EventBusPluginForAxon.java | 93 ++++++++++++++++------
.../EventBusServiceDefaultUsingAxonSimpleTest.java | 6 +-
core/pom.xml | 2 -
4 files changed, 71 insertions(+), 31 deletions(-)
diff --git a/core/plugins/eventbus-axon/pom.xml b/core/plugins/eventbus-axon/pom.xml
index b2d95c1..fb84f56 100644
--- a/core/plugins/eventbus-axon/pom.xml
+++ b/core/plugins/eventbus-axon/pom.xml
@@ -30,6 +30,7 @@
<properties>
<jar-plugin.automaticModuleName>org.apache.isis.plugins.eventbus-axon</jar-plugin.automaticModuleName>
<git-plugin.propertiesDir>org/apache/isis/plugins/eventbus-axon</git-plugin.propertiesDir>
+ <axon-core.version>3.3.5</axon-core.version>
</properties>
<build>
diff --git a/core/plugins/eventbus-axon/src/main/java/org/apache/isis/core/plugins/eventbus/EventBusPluginForAxon.java b/core/plugins/eventbus-axon/src/main/java/org/apache/isis/core/plugins/eventbus/EventBusPluginForAxon.java
index 2cfd6c9..99d3b65 100644
--- a/core/plugins/eventbus-axon/src/main/java/org/apache/isis/core/plugins/eventbus/EventBusPluginForAxon.java
+++ b/core/plugins/eventbus-axon/src/main/java/org/apache/isis/core/plugins/eventbus/EventBusPluginForAxon.java
@@ -16,19 +16,25 @@
*/
package org.apache.isis.core.plugins.eventbus;
+import static java.util.Objects.requireNonNull;
+import static org.apache.isis.commons.internal.base._With.acceptIfPresent;
+
+import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
-import org.apache.isis.applib.events.domain.AbstractDomainEvent;
-import org.apache.isis.core.plugins.eventbus.EventBusPlugin;
-import org.apache.isis.core.runtime.services.eventbus.EventBusImplementationAbstract;
-import org.axonframework.domain.EventMessage;
-import org.axonframework.domain.GenericEventMessage;
+import org.axonframework.common.Registration;
+import org.axonframework.eventhandling.AnnotationEventListenerAdapter;
import org.axonframework.eventhandling.EventListenerProxy;
+import org.axonframework.eventhandling.EventMessage;
+import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.SimpleEventBus;
-import org.axonframework.eventhandling.annotation.AnnotationEventListenerAdapter;
+
+import org.apache.isis.applib.events.domain.AbstractDomainEvent;
+import org.apache.isis.commons.internal.base._NullSafe;
+import org.apache.isis.core.runtime.services.eventbus.EventBusImplementationAbstract;
+
/**
* A wrapper for an Axon {@link org.axonframework.eventhandling.SimpleEventBus},
@@ -42,22 +48,18 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
@Override
public void register(final Object domainService) {
- simpleEventBus.subscribe(adapterFor(domainService));
+ final AxonEventListenerAdapter adapter = lookupOrCreateAdapterFor(domainService);
+ final Registration registrationHandle = simpleEventBus.subscribe(eventProcessorFor(adapter));
+ adapter.registration = registrationHandle;
}
-
+
@Override
public void unregister(final Object domainService) {
- // Seems it's needed to be a no-op (See EventBusService).
- // AxonSimpleEventBusAdapter.simpleEventBus.unsubscribe(AxonSimpleEventBusAdapter.adapterFor(domainService));
+ acceptIfPresent(lookupAdapterFor(domainService), adapter->{
+ acceptIfPresent(adapter.registration, Registration::cancel);
+ });
}
- /*
- * Logic equivalent to Guava Event Bus.
- *
- * <p>
- * Despite that, event processing cannot be followed after an Exception is thrown.
- * </p>
- */
@Override
public void post(final Object event) {
simpleEventBus.publish(GenericEventMessage.asEventMessage(event));
@@ -69,14 +71,19 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
final Consumer<T> onEvent) {
final AxonEventListener<T> eventListener = new AxonEventListener<T>(targetType, onEvent);
- simpleEventBus.subscribe(eventListener.proxy());
+ final EventListenerProxy proxy = eventListener.proxy();
+
+ final Registration registrationHandle = simpleEventBus.subscribe(eventProcessorFor(proxy));
+ eventListener.registration = registrationHandle;
+
return eventListener;
}
@Override
public <T> void removeEventListener(EventBusPlugin.EventListener<T> eventListener) {
if(eventListener instanceof AxonEventListener) {
- simpleEventBus.unsubscribe(((AxonEventListener<T>)eventListener).proxy());
+ final AxonEventListener<T> listenerInstance = (AxonEventListener<T>) eventListener;
+ acceptIfPresent(listenerInstance.registration, Registration::cancel);
}
}
@@ -90,12 +97,40 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
final Object payload = genericEventMessage.getPayload();
return asDomainEventIfPossible(payload);
}
- // don't think this occurs with axon, but this is the original behaviour
+ // don't think this occurs with axon, but this is the original behavior
// before the above change to detect GenericEventMessage
return asDomainEventIfPossible(event);
}
// -- HELPER
+
+ private Consumer<List<? extends EventMessage<?>>> eventProcessorFor(final EventListenerProxy proxy) {
+ return eventMessages->{
+ _NullSafe.stream(eventMessages)
+ .filter(proxy::canHandle)
+ .forEach(event->{
+ try {
+ proxy.handle(event);
+ } catch (final Exception exception) {
+ processException(exception, event);
+ }
+ });
+ };
+ }
+
+ private Consumer<List<? extends EventMessage<?>>> eventProcessorFor(final AxonEventListenerAdapter adapter) {
+ return eventMessages->{
+ _NullSafe.stream(eventMessages)
+ .filter(adapter::canHandle)
+ .forEach(event->{
+ try {
+ adapter.handle(event);
+ } catch (final Exception exception) {
+ processException(exception, event);
+ }
+ });
+ };
+ }
/**
* Wraps a Consumer as EventBusImplementation.EventListener with the given targetType.
@@ -105,8 +140,10 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
static class AxonEventListener<T> implements EventBusPlugin.EventListener<T> {
private final Consumer<T> eventConsumer;
private final EventListenerProxy proxy;
+ private Registration registration;
+
private AxonEventListener(final Class<T> targetType, final Consumer<T> eventConsumer) {
- this.eventConsumer = Objects.requireNonNull(eventConsumer);
+ this.eventConsumer = requireNonNull(eventConsumer);
this.proxy = new EventListenerProxy() {
@SuppressWarnings("unchecked")
@Override
@@ -135,8 +172,8 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
}
- private AxonEventListenerAdapter adapterFor(final Object domainService) {
- AxonEventListenerAdapter annotationEventListenerAdapter = listenerAdapterByDomainService.get(domainService);
+ private AxonEventListenerAdapter lookupOrCreateAdapterFor(final Object domainService) {
+ AxonEventListenerAdapter annotationEventListenerAdapter = lookupAdapterFor(domainService);
if (annotationEventListenerAdapter == null) {
annotationEventListenerAdapter = new AxonEventListenerAdapter(domainService);
listenerAdapterByDomainService.put(domainService, annotationEventListenerAdapter);
@@ -144,6 +181,10 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
return annotationEventListenerAdapter;
}
+ private AxonEventListenerAdapter lookupAdapterFor(final Object domainService) {
+ return listenerAdapterByDomainService.get(domainService);
+ }
+
private AbstractDomainEvent<?> asDomainEventIfPossible(final Object event) {
if (event instanceof AbstractDomainEvent)
return (AbstractDomainEvent<?>) event;
@@ -153,6 +194,8 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
class AxonEventListenerAdapter extends AnnotationEventListenerAdapter {
+ private Registration registration;
+
public AxonEventListenerAdapter(final Object annotatedEventListener) {
super(annotatedEventListener);
}
@@ -170,6 +213,4 @@ public class EventBusPluginForAxon extends EventBusImplementationAbstract {
}
}
-
-
}
\ No newline at end of file
diff --git a/core/plugins/eventbus-axon/src/test/java/org/apache/isis/core/runtime/services/eventbus/EventBusServiceDefaultUsingAxonSimpleTest.java b/core/plugins/eventbus-axon/src/test/java/org/apache/isis/core/runtime/services/eventbus/EventBusServiceDefaultUsingAxonSimpleTest.java
index 0f03b53..0cd6726 100644
--- a/core/plugins/eventbus-axon/src/test/java/org/apache/isis/core/runtime/services/eventbus/EventBusServiceDefaultUsingAxonSimpleTest.java
+++ b/core/plugins/eventbus-axon/src/test/java/org/apache/isis/core/runtime/services/eventbus/EventBusServiceDefaultUsingAxonSimpleTest.java
@@ -62,7 +62,7 @@ public class EventBusServiceDefaultUsingAxonSimpleTest {
public static class Subscriber1 {
Type1 obj;
- @org.axonframework.eventhandling.annotation.EventHandler
+ @org.axonframework.eventhandling.EventHandler
public void on1(Type1 obj) {
this.obj = obj;
}
@@ -70,7 +70,7 @@ public class EventBusServiceDefaultUsingAxonSimpleTest {
public static class Subscriber2 {
Type1 obj;
- @org.axonframework.eventhandling.annotation.EventHandler
+ @org.axonframework.eventhandling.EventHandler
public void on2(Type1 obj) {
this.obj = obj;
}
@@ -78,7 +78,7 @@ public class EventBusServiceDefaultUsingAxonSimpleTest {
public static class Subscriber3 {
Type3 obj;
- @org.axonframework.eventhandling.annotation.EventHandler
+ @org.axonframework.eventhandling.EventHandler
public void on3(Type3 obj) {
this.obj = obj;
}
diff --git a/core/pom.xml b/core/pom.xml
index d831369..2b81498 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -142,8 +142,6 @@
<commons-io.version>2.6</commons-io.version>
<com-sun-mail.version>1.5.2</com-sun-mail.version>
- <axon-core.version>3.3.5</axon-core.version>
-
<jackson.version>2.8.0</jackson.version>
<gson.version>2.7</gson.version>
<swagger-core.version>1.5.9</swagger-core.version>