You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2016/05/03 13:20:14 UTC
camel git commit: Camel CDI: Better handling of CDI event endpoints
Repository: camel
Updated Branches:
refs/heads/master a9180c61f -> 3fe56aefa
Camel CDI: Better handling of CDI event endpoints
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3fe56aef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3fe56aef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3fe56aef
Branch: refs/heads/master
Commit: 3fe56aefac9d4454e80d0cca4bf001592d2c67c8
Parents: a9180c6
Author: Antonin Stefanutti <an...@stefanutti.fr>
Authored: Tue May 3 13:20:06 2016 +0200
Committer: Antonin Stefanutti <an...@stefanutti.fr>
Committed: Tue May 3 13:20:06 2016 +0200
----------------------------------------------------------------------
.../org/apache/camel/cdi/CdiCamelExtension.java | 31 +++---
.../org/apache/camel/cdi/CdiCamelFactory.java | 47 +--------
.../org/apache/camel/cdi/CdiEventConsumer.java | 6 +-
.../org/apache/camel/cdi/CdiEventEndpoint.java | 100 ++++++++++++++++---
.../org/apache/camel/cdi/CdiEventProducer.java | 2 +-
.../java/org/apache/camel/cdi/CdiSpiHelper.java | 4 +-
.../camel/cdi/ForwardingObserverMethod.java | 24 ++---
7 files changed, 118 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
index 0c5c408..c26f09f 100755
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
@@ -82,6 +82,7 @@ import static org.apache.camel.cdi.AnyLiteral.ANY;
import static org.apache.camel.cdi.ApplicationScopedLiteral.APPLICATION_SCOPED;
import static org.apache.camel.cdi.BeanManagerHelper.getReference;
import static org.apache.camel.cdi.BeanManagerHelper.getReferencesByType;
+import static org.apache.camel.cdi.CdiEventEndpoint.eventEndpointUri;
import static org.apache.camel.cdi.CdiSpiHelper.getQualifiers;
import static org.apache.camel.cdi.CdiSpiHelper.getRawType;
import static org.apache.camel.cdi.CdiSpiHelper.hasAnnotation;
@@ -104,7 +105,7 @@ public class CdiCamelExtension implements Extension {
private final Set<AnnotatedType<?>> eagerBeans = newSetFromMap(new ConcurrentHashMap<>());
- private final Map<InjectionPoint, ForwardingObserverMethod<?>> cdiEventEndpoints = new ConcurrentHashMap<>();
+ private final Map<String, CdiEventEndpoint<?>> cdiEventEndpoints = new ConcurrentHashMap<>();
private final Set<Bean<?>> cdiBeans = newSetFromMap(new ConcurrentHashMap<>());
@@ -118,8 +119,8 @@ public class CdiCamelExtension implements Extension {
private final Set<ImportResource> resources = newSetFromMap(new ConcurrentHashMap<>());
- ForwardingObserverMethod<?> getObserverMethod(InjectionPoint ip) {
- return cdiEventEndpoints.get(ip);
+ CdiEventEndpoint<?> getEventEndpoint(String uri) {
+ return cdiEventEndpoints.get(uri);
}
Set<Annotation> getObserverEvents() {
@@ -218,19 +219,17 @@ public class CdiCamelExtension implements Extension {
cdiBeans.add(pb.getBean());
}
- private void beans(@Observes ProcessBean<?> pb) {
+ private void beans(@Observes ProcessBean<?> pb, BeanManager manager) {
cdiBeans.add(pb.getBean());
-
- // TODO: refine the key to the type and qualifiers instead of the whole injection point as it leads to registering redundant observers
+ // Lookup for CDI event endpoint injection points
pb.getBean().getInjectionPoints().stream()
.filter(ip -> CdiEventEndpoint.class.equals(getRawType(ip.getType())))
.forEach(ip -> {
- // TODO: refine the key to the type and qualifiers instead of the whole injection point as it leads to registering redundant observers
- if (ip.getType() instanceof ParameterizedType) {
- cdiEventEndpoints.put(ip, new ForwardingObserverMethod<>(((ParameterizedType) ip.getType()).getActualTypeArguments()[0], ip.getQualifiers()));
- } else if (ip.getType() instanceof Class) {
- cdiEventEndpoints.put(ip, new ForwardingObserverMethod<>(Object.class, ip.getQualifiers()));
- }
+ Type type = ip.getType() instanceof ParameterizedType
+ ? ((ParameterizedType) ip.getType()).getActualTypeArguments()[0]
+ : Object.class;
+ String uri = eventEndpointUri(type, ip.getQualifiers());
+ cdiEventEndpoints.put(uri, new CdiEventEndpoint<>(uri, type, ip.getQualifiers(), manager));
});
}
@@ -298,8 +297,8 @@ public class CdiCamelExtension implements Extension {
extraBeans.forEach(abd::addBean);
// Update the CDI Camel factory beans
- Set<Annotation> endpointQualifiers = cdiEventEndpoints.keySet().stream()
- .map(InjectionPoint::getQualifiers)
+ Set<Annotation> endpointQualifiers = cdiEventEndpoints.values().stream()
+ .map(CdiEventEndpoint::getQualifiers)
.flatMap(Set::stream)
.collect(toSet());
Set<Annotation> templateQualifiers = contextQualifiers.stream()
@@ -315,7 +314,9 @@ public class CdiCamelExtension implements Extension {
.forEach(abd::addBean);
// Add CDI event endpoint observer methods
- cdiEventEndpoints.values().forEach(abd::addObserverMethod);
+ cdiEventEndpoints.values().stream()
+ .map(ForwardingObserverMethod::new)
+ .forEach(abd::addObserverMethod);
}
private boolean shouldDeployDefaultCamelContext(Set<Bean<?>> beans) {
http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java
index 8644400..b0e87aa 100755
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java
@@ -17,17 +17,11 @@
package org.apache.camel.cdi;
import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.HashSet;
import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Stream;
-
-import static java.util.stream.Collectors.joining;
import javax.enterprise.event.Event;
import javax.enterprise.inject.Any;
@@ -38,7 +32,6 @@ import javax.enterprise.inject.Produces;
import javax.enterprise.inject.Typed;
import javax.enterprise.inject.UnsatisfiedResolutionException;
import javax.enterprise.inject.spi.InjectionPoint;
-import javax.enterprise.util.TypeLiteral;
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
@@ -47,6 +40,7 @@ import org.apache.camel.ProducerTemplate;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.mock.MockEndpoint;
+import static org.apache.camel.cdi.CdiEventEndpoint.eventEndpointUri;
import static org.apache.camel.cdi.CdiSpiHelper.isAnnotationType;
import static org.apache.camel.cdi.DefaultLiteral.DEFAULT;
@@ -168,20 +162,7 @@ final class CdiCamelFactory {
}
String uri = eventEndpointUri(type, ip.getQualifiers());
if (context.hasEndpoint(uri) == null) {
- // FIXME: to be replaced once event firing with dynamic parameterized type is properly supported (see https://issues.jboss.org/browse/CDI-516)
- TypeLiteral<T> literal = new TypeLiteral<T>() {
- };
- for (Field field : TypeLiteral.class.getDeclaredFields()) {
- if (field.getType().equals(Type.class)) {
- field.setAccessible(true);
- field.set(literal, type);
- break;
- }
- }
- context.addEndpoint(uri,
- new CdiEventEndpoint<>(
- event.select(literal, ip.getQualifiers().stream().toArray(Annotation[]::new)),
- uri, context, (ForwardingObserverMethod<T>) extension.getObserverMethod(ip)));
+ context.addEndpoint(uri, extension.getEventEndpoint(uri));
}
return context.getEndpoint(uri, CdiEventEndpoint.class);
}
@@ -204,30 +185,6 @@ final class CdiCamelFactory {
return instance.select(qualifiers.stream().toArray(Annotation[]::new)).get();
}
- private static String eventEndpointUri(Type type, Set<Annotation> qualifiers) {
- return "cdi-event://" + authorityFromType(type) + qualifiers.stream()
- .map(Annotation::annotationType)
- .map(Class::getCanonicalName)
- .collect(joining("%2C", qualifiers.size() > 0 ? "?qualifiers=" : "", ""));
- }
-
- private static String authorityFromType(Type type) {
- if (type instanceof Class) {
- return Class.class.cast(type).getName();
- }
- if (type instanceof ParameterizedType) {
- ParameterizedType pt = (ParameterizedType) type;
- return Stream.of(pt.getActualTypeArguments())
- .map(CdiCamelFactory::authorityFromType)
- .collect(joining("%2C", authorityFromType(pt.getRawType()) + "%3C", "%3E"));
- }
- if (type instanceof GenericArrayType) {
- GenericArrayType arrayType = (GenericArrayType) type;
- return authorityFromType(arrayType.getGenericComponentType()) + "%5B%5D";
- }
- throw new IllegalArgumentException("Cannot create URI authority for event type [" + type + "]");
- }
-
private static <T extends Annotation> Optional<T> getQualifierByType(InjectionPoint ip, Class<T> type) {
return ip.getQualifiers().stream()
.filter(isAnnotationType(type))
http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java
index f479163..b7d7861 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java
@@ -24,7 +24,7 @@ import org.apache.camel.management.event.AbstractExchangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/* package-private */ final class CdiEventConsumer<T> extends DefaultConsumer {
+final class CdiEventConsumer<T> extends DefaultConsumer {
private final Logger logger = LoggerFactory.getLogger(CdiEventConsumer.class);
@@ -38,12 +38,12 @@ import org.slf4j.LoggerFactory;
@Override
protected void doStart() throws Exception {
super.doStart();
- endpoint.registerConsumer(this);
+ endpoint.addConsumer(this);
}
@Override
protected void doStop() throws Exception {
- endpoint.unregisterConsumer(this);
+ endpoint.removeConsumer(this);
super.doStop();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java
index adfda4f..ece2bcb 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java
@@ -16,11 +16,26 @@
*/
package org.apache.camel.cdi;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.joining;
+
+import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.event.Event;
+import javax.enterprise.inject.Any;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.enterprise.inject.spi.InjectionTarget;
+import javax.enterprise.util.TypeLiteral;
+import javax.inject.Inject;
-import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
@@ -83,21 +98,82 @@ public final class CdiEventEndpoint<T> extends DefaultEndpoint {
private final List<CdiEventConsumer<T>> consumers = new ArrayList<>();
- private final Event<T> event;
+ private final Type type;
+
+ private final Set<Annotation> qualifiers;
+
+ private final BeanManager manager;
+
+ CdiEventEndpoint(String endpointUri, Type type, Set<Annotation> qualifiers, BeanManager manager) {
+ super(endpointUri);
+ this.type = type;
+ this.qualifiers = qualifiers;
+ this.manager = manager;
+ }
+
+ static String eventEndpointUri(Type type, Set<Annotation> qualifiers) {
+ return "cdi-event://" + authorityFromType(type) + qualifiers.stream()
+ .map(CdiSpiHelper::createAnnotationId)
+ .collect(joining("%2C", qualifiers.size() > 0 ? "?qualifiers=" : "", ""));
+ }
+
+ private static String authorityFromType(Type type) {
+ if (type instanceof Class) {
+ return Class.class.cast(type).getName();
+ }
+ if (type instanceof ParameterizedType) {
+ return Stream.of(((ParameterizedType) type).getActualTypeArguments())
+ .map(CdiEventEndpoint::authorityFromType)
+ .collect(joining("%2C", authorityFromType(((ParameterizedType) type).getRawType()) + "%3C", "%3E"));
+ }
+ if (type instanceof GenericArrayType) {
+ return authorityFromType(((GenericArrayType) type).getGenericComponentType()) + "%5B%5D";
+ }
+
+ throw new IllegalArgumentException("Cannot create URI authority for event type [" + type + "]");
+ }
+
+ Set<Annotation> getQualifiers() {
+ return qualifiers;
+ }
- CdiEventEndpoint(Event<T> event, String endpointUri, CamelContext context, ForwardingObserverMethod<T> observer) {
- super(endpointUri, context);
- this.event = event;
- observer.setObserver(this);
+ Type getType() {
+ return type;
}
+ @Override
public Consumer createConsumer(Processor processor) {
return new CdiEventConsumer<>(this, processor);
}
@Override
- public Producer createProducer() {
- return new CdiEventProducer<>(this, event);
+ public Producer createProducer() throws IllegalAccessException {
+ // FIXME: to be replaced once event firing with dynamic parameterized type
+ // is properly supported (see https://issues.jboss.org/browse/CDI-516)
+ TypeLiteral<T> literal = new TypeLiteral<T>() {
+ };
+ for (Field field : TypeLiteral.class.getDeclaredFields()) {
+ if (field.getType().equals(Type.class)) {
+ field.setAccessible(true);
+ field.set(literal, type);
+ break;
+ }
+ }
+
+ InjectionTarget<AnyEvent> target = manager.createInjectionTarget(manager.createAnnotatedType(AnyEvent.class));
+ CreationalContext<AnyEvent> ctx = manager.createCreationalContext(null);
+ AnyEvent instance = target.produce(ctx);
+ target.inject(instance, ctx);
+ return new CdiEventProducer<>(this, instance.event
+ .select(literal, qualifiers.stream().toArray(Annotation[]::new)));
+ }
+
+ @Vetoed
+ private static class AnyEvent {
+
+ @Any
+ @Inject
+ private Event<Object> event;
}
@Override
@@ -105,13 +181,13 @@ public final class CdiEventEndpoint<T> extends DefaultEndpoint {
return true;
}
- void registerConsumer(CdiEventConsumer<T> consumer) {
+ void addConsumer(CdiEventConsumer<T> consumer) {
synchronized (consumers) {
consumers.add(consumer);
}
}
- void unregisterConsumer(CdiEventConsumer<T> consumer) {
+ void removeConsumer(CdiEventConsumer<T> consumer) {
synchronized (consumers) {
consumers.remove(consumer);
}
@@ -119,9 +195,7 @@ public final class CdiEventEndpoint<T> extends DefaultEndpoint {
void notify(T t) {
synchronized (consumers) {
- for (CdiEventConsumer<T> consumer : consumers) {
- consumer.notify(t);
- }
+ consumers.forEach(consumer -> consumer.notify(t));
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java
index 11cb274..5ada4db 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java
@@ -23,7 +23,7 @@ import org.apache.camel.impl.DefaultProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/* package-private */ final class CdiEventProducer<T> extends DefaultProducer {
+final class CdiEventProducer<T> extends DefaultProducer {
private final Logger logger = LoggerFactory.getLogger(CdiEventProducer.class);
http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java
index 89e8b87..5071719 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java
@@ -47,6 +47,7 @@ import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.AnnotatedType;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
+import javax.enterprise.util.Nonbinding;
import static org.apache.camel.cdi.AnyLiteral.ANY;
import static org.apache.camel.cdi.DefaultLiteral.DEFAULT;
@@ -202,11 +203,12 @@ final class CdiSpiHelper {
/**
* Generates a unique signature for an {@link Annotation}.
*/
- private static String createAnnotationId(Annotation annotation) {
+ static String createAnnotationId(Annotation annotation) {
Method[] methods = doPrivileged(
(PrivilegedAction<Method[]>) () -> annotation.annotationType().getDeclaredMethods());
return Stream.of(methods)
+ .filter(method -> !method.isAnnotationPresent(Nonbinding.class))
.sorted(comparing(Method::getName))
.collect(() -> new StringJoiner(",", "@" + annotation.annotationType().getCanonicalName() + "(", ")"),
(joiner, method) -> {
http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java
index b0f8a32..77394b4 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java
@@ -19,7 +19,6 @@ package org.apache.camel.cdi;
import java.lang.annotation.Annotation;
import java.lang.reflect.Type;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
import javax.enterprise.event.Reception;
import javax.enterprise.event.TransactionPhase;
import javax.enterprise.inject.spi.ObserverMethod;
@@ -28,19 +27,10 @@ import org.apache.camel.CamelContext;
final class ForwardingObserverMethod<T> implements ObserverMethod<T> {
- private final AtomicReference<CdiEventEndpoint<T>> observer = new AtomicReference<>();
+ private final CdiEventEndpoint<T> endpoint;
- private final Type type;
-
- private final Set<Annotation> qualifiers;
-
- ForwardingObserverMethod(Type type, Set<Annotation> qualifiers) {
- this.type = type;
- this.qualifiers = qualifiers;
- }
-
- void setObserver(CdiEventEndpoint<T> observer) {
- this.observer.set(observer);
+ ForwardingObserverMethod(CdiEventEndpoint<T> endpoint) {
+ this.endpoint = endpoint;
}
@Override
@@ -50,12 +40,12 @@ final class ForwardingObserverMethod<T> implements ObserverMethod<T> {
@Override
public Type getObservedType() {
- return type;
+ return endpoint.getType();
}
@Override
public Set<Annotation> getObservedQualifiers() {
- return qualifiers;
+ return endpoint.getQualifiers();
}
@Override
@@ -70,8 +60,6 @@ final class ForwardingObserverMethod<T> implements ObserverMethod<T> {
@Override
public void notify(T event) {
- if (observer.get() != null) {
- observer.get().notify(event);
- }
+ endpoint.notify(event);
}
}