You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by as...@apache.org on 2016/05/03 13:20:14 UTC

camel git commit: Camel CDI: Better handling of CDI event endpoints

Repository: camel
Updated Branches:
  refs/heads/master a9180c61f -> 3fe56aefa


Camel CDI: Better handling of CDI event endpoints


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3fe56aef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3fe56aef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3fe56aef

Branch: refs/heads/master
Commit: 3fe56aefac9d4454e80d0cca4bf001592d2c67c8
Parents: a9180c6
Author: Antonin Stefanutti <an...@stefanutti.fr>
Authored: Tue May 3 13:20:06 2016 +0200
Committer: Antonin Stefanutti <an...@stefanutti.fr>
Committed: Tue May 3 13:20:06 2016 +0200

----------------------------------------------------------------------
 .../org/apache/camel/cdi/CdiCamelExtension.java |  31 +++---
 .../org/apache/camel/cdi/CdiCamelFactory.java   |  47 +--------
 .../org/apache/camel/cdi/CdiEventConsumer.java  |   6 +-
 .../org/apache/camel/cdi/CdiEventEndpoint.java  | 100 ++++++++++++++++---
 .../org/apache/camel/cdi/CdiEventProducer.java  |   2 +-
 .../java/org/apache/camel/cdi/CdiSpiHelper.java |   4 +-
 .../camel/cdi/ForwardingObserverMethod.java     |  24 ++---
 7 files changed, 118 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
index 0c5c408..c26f09f 100755
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelExtension.java
@@ -82,6 +82,7 @@ import static org.apache.camel.cdi.AnyLiteral.ANY;
 import static org.apache.camel.cdi.ApplicationScopedLiteral.APPLICATION_SCOPED;
 import static org.apache.camel.cdi.BeanManagerHelper.getReference;
 import static org.apache.camel.cdi.BeanManagerHelper.getReferencesByType;
+import static org.apache.camel.cdi.CdiEventEndpoint.eventEndpointUri;
 import static org.apache.camel.cdi.CdiSpiHelper.getQualifiers;
 import static org.apache.camel.cdi.CdiSpiHelper.getRawType;
 import static org.apache.camel.cdi.CdiSpiHelper.hasAnnotation;
@@ -104,7 +105,7 @@ public class CdiCamelExtension implements Extension {
 
     private final Set<AnnotatedType<?>> eagerBeans = newSetFromMap(new ConcurrentHashMap<>());
 
-    private final Map<InjectionPoint, ForwardingObserverMethod<?>> cdiEventEndpoints = new ConcurrentHashMap<>();
+    private final Map<String, CdiEventEndpoint<?>> cdiEventEndpoints = new ConcurrentHashMap<>();
 
     private final Set<Bean<?>> cdiBeans = newSetFromMap(new ConcurrentHashMap<>());
 
@@ -118,8 +119,8 @@ public class CdiCamelExtension implements Extension {
 
     private final Set<ImportResource> resources = newSetFromMap(new ConcurrentHashMap<>());
 
-    ForwardingObserverMethod<?> getObserverMethod(InjectionPoint ip) {
-        return cdiEventEndpoints.get(ip);
+    CdiEventEndpoint<?> getEventEndpoint(String uri) {
+        return cdiEventEndpoints.get(uri);
     }
 
     Set<Annotation> getObserverEvents() {
@@ -218,19 +219,17 @@ public class CdiCamelExtension implements Extension {
         cdiBeans.add(pb.getBean());
     }
 
-    private void beans(@Observes ProcessBean<?> pb) {
+    private void beans(@Observes ProcessBean<?> pb, BeanManager manager) {
         cdiBeans.add(pb.getBean());
-
-        // TODO: refine the key to the type and qualifiers instead of the whole injection point as it leads to registering redundant observers
+        // Lookup for CDI event endpoint injection points
         pb.getBean().getInjectionPoints().stream()
             .filter(ip -> CdiEventEndpoint.class.equals(getRawType(ip.getType())))
             .forEach(ip -> {
-                // TODO: refine the key to the type and qualifiers instead of the whole injection point as it leads to registering redundant observers
-                if (ip.getType() instanceof ParameterizedType) {
-                    cdiEventEndpoints.put(ip, new ForwardingObserverMethod<>(((ParameterizedType) ip.getType()).getActualTypeArguments()[0], ip.getQualifiers()));
-                } else if (ip.getType() instanceof Class) {
-                    cdiEventEndpoints.put(ip, new ForwardingObserverMethod<>(Object.class, ip.getQualifiers()));
-                }
+                Type type = ip.getType() instanceof ParameterizedType
+                    ? ((ParameterizedType) ip.getType()).getActualTypeArguments()[0]
+                    : Object.class;
+                String uri = eventEndpointUri(type, ip.getQualifiers());
+                cdiEventEndpoints.put(uri, new CdiEventEndpoint<>(uri, type, ip.getQualifiers(), manager));
             });
     }
 
@@ -298,8 +297,8 @@ public class CdiCamelExtension implements Extension {
         extraBeans.forEach(abd::addBean);
 
         // Update the CDI Camel factory beans
-        Set<Annotation> endpointQualifiers = cdiEventEndpoints.keySet().stream()
-            .map(InjectionPoint::getQualifiers)
+        Set<Annotation> endpointQualifiers = cdiEventEndpoints.values().stream()
+            .map(CdiEventEndpoint::getQualifiers)
             .flatMap(Set::stream)
             .collect(toSet());
         Set<Annotation> templateQualifiers = contextQualifiers.stream()
@@ -315,7 +314,9 @@ public class CdiCamelExtension implements Extension {
             .forEach(abd::addBean);
 
         // Add CDI event endpoint observer methods
-        cdiEventEndpoints.values().forEach(abd::addObserverMethod);
+        cdiEventEndpoints.values().stream()
+            .map(ForwardingObserverMethod::new)
+            .forEach(abd::addObserverMethod);
     }
 
     private boolean shouldDeployDefaultCamelContext(Set<Bean<?>> beans) {

http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java
index 8644400..b0e87aa 100755
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiCamelFactory.java
@@ -17,17 +17,11 @@
 package org.apache.camel.cdi;
 
 import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Stream;
-
-import static java.util.stream.Collectors.joining;
 
 import javax.enterprise.event.Event;
 import javax.enterprise.inject.Any;
@@ -38,7 +32,6 @@ import javax.enterprise.inject.Produces;
 import javax.enterprise.inject.Typed;
 import javax.enterprise.inject.UnsatisfiedResolutionException;
 import javax.enterprise.inject.spi.InjectionPoint;
-import javax.enterprise.util.TypeLiteral;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ConsumerTemplate;
@@ -47,6 +40,7 @@ import org.apache.camel.ProducerTemplate;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.mock.MockEndpoint;
 
+import static org.apache.camel.cdi.CdiEventEndpoint.eventEndpointUri;
 import static org.apache.camel.cdi.CdiSpiHelper.isAnnotationType;
 import static org.apache.camel.cdi.DefaultLiteral.DEFAULT;
 
@@ -168,20 +162,7 @@ final class CdiCamelFactory {
         }
         String uri = eventEndpointUri(type, ip.getQualifiers());
         if (context.hasEndpoint(uri) == null) {
-            // FIXME: to be replaced once event firing with dynamic parameterized type is properly supported (see https://issues.jboss.org/browse/CDI-516)
-            TypeLiteral<T> literal = new TypeLiteral<T>() {
-            };
-            for (Field field : TypeLiteral.class.getDeclaredFields()) {
-                if (field.getType().equals(Type.class)) {
-                    field.setAccessible(true);
-                    field.set(literal, type);
-                    break;
-                }
-            }
-            context.addEndpoint(uri,
-                new CdiEventEndpoint<>(
-                    event.select(literal, ip.getQualifiers().stream().toArray(Annotation[]::new)),
-                    uri, context, (ForwardingObserverMethod<T>) extension.getObserverMethod(ip)));
+            context.addEndpoint(uri, extension.getEventEndpoint(uri));
         }
         return context.getEndpoint(uri, CdiEventEndpoint.class);
     }
@@ -204,30 +185,6 @@ final class CdiCamelFactory {
         return instance.select(qualifiers.stream().toArray(Annotation[]::new)).get();
     }
 
-    private static String eventEndpointUri(Type type, Set<Annotation> qualifiers) {
-        return "cdi-event://" + authorityFromType(type) + qualifiers.stream()
-            .map(Annotation::annotationType)
-            .map(Class::getCanonicalName)
-            .collect(joining("%2C", qualifiers.size() > 0 ? "?qualifiers=" : "", ""));
-    }
-
-    private static String authorityFromType(Type type) {
-        if (type instanceof Class) {
-            return Class.class.cast(type).getName();
-        }
-        if (type instanceof ParameterizedType) {
-            ParameterizedType pt = (ParameterizedType) type;
-            return Stream.of(pt.getActualTypeArguments())
-                .map(CdiCamelFactory::authorityFromType)
-                .collect(joining("%2C", authorityFromType(pt.getRawType()) + "%3C", "%3E"));
-        }
-        if (type instanceof GenericArrayType) {
-            GenericArrayType arrayType = (GenericArrayType) type;
-            return authorityFromType(arrayType.getGenericComponentType()) + "%5B%5D";
-        }
-        throw new IllegalArgumentException("Cannot create URI authority for event type [" + type + "]");
-    }
-
     private static <T extends Annotation> Optional<T> getQualifierByType(InjectionPoint ip, Class<T> type) {
         return ip.getQualifiers().stream()
             .filter(isAnnotationType(type))

http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java
index f479163..b7d7861 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventConsumer.java
@@ -24,7 +24,7 @@ import org.apache.camel.management.event.AbstractExchangeEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/* package-private */ final class CdiEventConsumer<T> extends DefaultConsumer {
+final class CdiEventConsumer<T> extends DefaultConsumer {
 
     private final Logger logger = LoggerFactory.getLogger(CdiEventConsumer.class);
 
@@ -38,12 +38,12 @@ import org.slf4j.LoggerFactory;
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        endpoint.registerConsumer(this);
+        endpoint.addConsumer(this);
     }
 
     @Override
     protected void doStop() throws Exception {
-        endpoint.unregisterConsumer(this);
+        endpoint.removeConsumer(this);
         super.doStop();
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java
index adfda4f..ece2bcb 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventEndpoint.java
@@ -16,11 +16,26 @@
  */
 package org.apache.camel.cdi;
 
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.joining;
+
+import javax.enterprise.context.spi.CreationalContext;
 import javax.enterprise.event.Event;
+import javax.enterprise.inject.Any;
+import javax.enterprise.inject.spi.BeanManager;
+import javax.enterprise.inject.spi.InjectionTarget;
+import javax.enterprise.util.TypeLiteral;
+import javax.inject.Inject;
 
-import org.apache.camel.CamelContext;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
@@ -83,21 +98,82 @@ public final class CdiEventEndpoint<T> extends DefaultEndpoint {
 
     private final List<CdiEventConsumer<T>> consumers = new ArrayList<>();
 
-    private final Event<T> event;
+    private final Type type;
+
+    private final Set<Annotation> qualifiers;
+
+    private final BeanManager manager;
+
+    CdiEventEndpoint(String endpointUri, Type type, Set<Annotation> qualifiers, BeanManager manager) {
+        super(endpointUri);
+        this.type = type;
+        this.qualifiers = qualifiers;
+        this.manager = manager;
+    }
+
+    static String eventEndpointUri(Type type, Set<Annotation> qualifiers) {
+        return "cdi-event://" + authorityFromType(type) + qualifiers.stream()
+            .map(CdiSpiHelper::createAnnotationId)
+            .collect(joining("%2C", qualifiers.size() > 0 ? "?qualifiers=" : "", ""));
+    }
+
+    private static String authorityFromType(Type type) {
+        if (type instanceof Class) {
+            return Class.class.cast(type).getName();
+        }
+        if (type instanceof ParameterizedType) {
+            return Stream.of(((ParameterizedType) type).getActualTypeArguments())
+                .map(CdiEventEndpoint::authorityFromType)
+                .collect(joining("%2C", authorityFromType(((ParameterizedType) type).getRawType()) + "%3C", "%3E"));
+        }
+        if (type instanceof GenericArrayType) {
+            return authorityFromType(((GenericArrayType) type).getGenericComponentType()) + "%5B%5D";
+        }
+
+        throw new IllegalArgumentException("Cannot create URI authority for event type [" + type + "]");
+    }
+
+    Set<Annotation> getQualifiers() {
+        return qualifiers;
+    }
 
-    CdiEventEndpoint(Event<T> event, String endpointUri, CamelContext context, ForwardingObserverMethod<T> observer) {
-        super(endpointUri, context);
-        this.event = event;
-        observer.setObserver(this);
+    Type getType() {
+        return type;
     }
 
+    @Override
     public Consumer createConsumer(Processor processor) {
         return new CdiEventConsumer<>(this, processor);
     }
 
     @Override
-    public Producer createProducer() {
-        return new CdiEventProducer<>(this, event);
+    public Producer createProducer() throws IllegalAccessException {
+        // FIXME: to be replaced once event firing with dynamic parameterized type
+        // is properly supported (see https://issues.jboss.org/browse/CDI-516)
+        TypeLiteral<T> literal = new TypeLiteral<T>() {
+        };
+        for (Field field : TypeLiteral.class.getDeclaredFields()) {
+            if (field.getType().equals(Type.class)) {
+                field.setAccessible(true);
+                field.set(literal, type);
+                break;
+            }
+        }
+
+        InjectionTarget<AnyEvent> target = manager.createInjectionTarget(manager.createAnnotatedType(AnyEvent.class));
+        CreationalContext<AnyEvent> ctx = manager.createCreationalContext(null);
+        AnyEvent instance = target.produce(ctx);
+        target.inject(instance, ctx);
+        return new CdiEventProducer<>(this, instance.event
+            .select(literal, qualifiers.stream().toArray(Annotation[]::new)));
+    }
+
+    @Vetoed
+    private static class AnyEvent {
+
+        @Any
+        @Inject
+        private Event<Object> event;
     }
 
     @Override
@@ -105,13 +181,13 @@ public final class CdiEventEndpoint<T> extends DefaultEndpoint {
         return true;
     }
 
-    void registerConsumer(CdiEventConsumer<T> consumer) {
+    void addConsumer(CdiEventConsumer<T> consumer) {
         synchronized (consumers) {
             consumers.add(consumer);
         }
     }
 
-    void unregisterConsumer(CdiEventConsumer<T> consumer) {
+    void removeConsumer(CdiEventConsumer<T> consumer) {
         synchronized (consumers) {
             consumers.remove(consumer);
         }
@@ -119,9 +195,7 @@ public final class CdiEventEndpoint<T> extends DefaultEndpoint {
 
     void notify(T t) {
         synchronized (consumers) {
-            for (CdiEventConsumer<T> consumer : consumers) {
-                consumer.notify(t);
-            }
+            consumers.forEach(consumer -> consumer.notify(t));
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java
index 11cb274..5ada4db 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiEventProducer.java
@@ -23,7 +23,7 @@ import org.apache.camel.impl.DefaultProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/* package-private */ final class CdiEventProducer<T> extends DefaultProducer {
+final class CdiEventProducer<T> extends DefaultProducer {
 
     private final Logger logger = LoggerFactory.getLogger(CdiEventProducer.class);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java
index 89e8b87..5071719 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/CdiSpiHelper.java
@@ -47,6 +47,7 @@ import javax.enterprise.inject.spi.AnnotatedMethod;
 import javax.enterprise.inject.spi.AnnotatedType;
 import javax.enterprise.inject.spi.Bean;
 import javax.enterprise.inject.spi.BeanManager;
+import javax.enterprise.util.Nonbinding;
 
 import static org.apache.camel.cdi.AnyLiteral.ANY;
 import static org.apache.camel.cdi.DefaultLiteral.DEFAULT;
@@ -202,11 +203,12 @@ final class CdiSpiHelper {
     /**
      * Generates a unique signature for an {@link Annotation}.
      */
-    private static String createAnnotationId(Annotation annotation) {
+    static String createAnnotationId(Annotation annotation) {
         Method[] methods = doPrivileged(
             (PrivilegedAction<Method[]>) () -> annotation.annotationType().getDeclaredMethods());
 
         return Stream.of(methods)
+            .filter(method -> !method.isAnnotationPresent(Nonbinding.class))
             .sorted(comparing(Method::getName))
             .collect(() -> new StringJoiner(",", "@" + annotation.annotationType().getCanonicalName() + "(", ")"),
                 (joiner, method) -> {

http://git-wip-us.apache.org/repos/asf/camel/blob/3fe56aef/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java
----------------------------------------------------------------------
diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java
index b0f8a32..77394b4 100644
--- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java
+++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/ForwardingObserverMethod.java
@@ -19,7 +19,6 @@ package org.apache.camel.cdi;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Type;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
 import javax.enterprise.event.Reception;
 import javax.enterprise.event.TransactionPhase;
 import javax.enterprise.inject.spi.ObserverMethod;
@@ -28,19 +27,10 @@ import org.apache.camel.CamelContext;
 
 final class ForwardingObserverMethod<T> implements ObserverMethod<T> {
 
-    private final AtomicReference<CdiEventEndpoint<T>> observer = new AtomicReference<>();
+    private final CdiEventEndpoint<T> endpoint;
 
-    private final Type type;
-
-    private final Set<Annotation> qualifiers;
-
-    ForwardingObserverMethod(Type type, Set<Annotation> qualifiers) {
-        this.type = type;
-        this.qualifiers = qualifiers;
-    }
-
-    void setObserver(CdiEventEndpoint<T> observer) {
-        this.observer.set(observer);
+    ForwardingObserverMethod(CdiEventEndpoint<T> endpoint) {
+        this.endpoint = endpoint;
     }
 
     @Override
@@ -50,12 +40,12 @@ final class ForwardingObserverMethod<T> implements ObserverMethod<T> {
 
     @Override
     public Type getObservedType() {
-        return type;
+        return endpoint.getType();
     }
 
     @Override
     public Set<Annotation> getObservedQualifiers() {
-        return qualifiers;
+        return endpoint.getQualifiers();
     }
 
     @Override
@@ -70,8 +60,6 @@ final class ForwardingObserverMethod<T> implements ObserverMethod<T> {
 
     @Override
     public void notify(T event) {
-        if (observer.get() != null) {
-            observer.get().notify(event);
-        }
+        endpoint.notify(event);
     }
 }