You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwebbeans.apache.org by db...@apache.org on 2009/10/07 05:10:50 UTC

svn commit: r822570 - in /incubator/openwebbeans/trunk/webbeans-impl/src: main/java/org/apache/webbeans/event/ main/java/org/apache/webbeans/util/ test/java/org/apache/webbeans/test/unittests/portable/events/

Author: dblevins
Date: Wed Oct  7 03:10:50 2009
New Revision: 822570

URL: http://svn.apache.org/viewvc?rev=822570&view=rev
Log:
OWB-139: Transactional Observer Methods
Reworked the code to guarantee the correct number javax.transaction.Synchronization registrations and subsequent event firings.

Removed:
    incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/ObserverWrapper.java
    incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/TransactionalNotifier.java
Modified:
    incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java
    incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/util/ArrayUtil.java
    incubator/openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/unittests/portable/events/ExtensionTest.java

Modified: incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java
URL: http://svn.apache.org/viewvc/incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java?rev=822570&r1=822569&r2=822570&view=diff
==============================================================================
--- incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java (original)
+++ incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/event/NotificationManager.java Wed Oct  7 03:10:50 2009
@@ -16,18 +16,15 @@
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
 
 import javax.enterprise.event.Reception;
 import javax.enterprise.event.Observer;
 import javax.enterprise.event.ObserverException;
 import javax.enterprise.event.Observes;
 import javax.enterprise.inject.TypeLiteral;
+import javax.enterprise.inject.Default;
 import javax.transaction.Status;
 import javax.transaction.Synchronization;
 import javax.transaction.Transaction;
@@ -42,16 +39,16 @@
 import org.apache.webbeans.util.AnnotationUtil;
 import org.apache.webbeans.util.Asserts;
 import org.apache.webbeans.util.ClassUtil;
+import org.apache.webbeans.util.ArrayUtil;
+import static org.apache.webbeans.util.ArrayUtil.asSet;
 
 @SuppressWarnings("unchecked")
-public final class NotificationManager implements Synchronization
+public final class NotificationManager
 {
     private static final WebBeansLogger logger = WebBeansLogger.getLogger(NotificationManager.class);
 
     private final Map<Type, Set<ObserverWrapper<?>>> observers = new ConcurrentHashMap<Type, Set<ObserverWrapper<?>>>();
 
-    private final Set<TransactionalNotifier> transactionSet = new CopyOnWriteArraySet<TransactionalNotifier>();
-    
     private final TransactionService transactionService = ServiceLoader.getService(TransactionService.class);
 
     public NotificationManager()
@@ -68,15 +65,8 @@
 
     public <T> void addObserver(Observer<T> observer, Type eventType, Annotation... annotations)
     {
-        addObserver(observer, false, TransactionalObserverType.NONE, eventType, annotations);
-    }
-
-    public <T> void addObserver(Observer<T> observer, boolean ifExist, TransactionalObserverType type, Type eventType, Annotation... annotations)
-    {
         EventUtil.checkEventBindings(annotations);
 
-        ObserverWrapper<T> observerImpl = new ObserverWrapper<T>(observer, ifExist, type, eventType, annotations);
-
         Set<ObserverWrapper<?>> set = observers.get(eventType);
         if (set == null)
         {
@@ -84,14 +74,14 @@
             observers.put(eventType, set);
         }
 
-        set.add(observerImpl);
+        set.add(new ObserverWrapper<T>(observer, annotations));
     }
 
     public <T> void addObserver(Observer<T> observer, TypeLiteral<T> typeLiteral, Annotation... annotations)
     {
         EventUtil.checkEventType(typeLiteral.getRawType());
 
-        addObserver(observer, false, TransactionalObserverType.NONE, typeLiteral.getRawType(), annotations);
+        addObserver(observer, typeLiteral.getRawType(), annotations);
     }
 
     public <T> void removeObserver(Observer<T> observer, Class<T> eventType, Annotation... annotations)
@@ -106,7 +96,7 @@
             {
                 Observer<T> ob = (Observer<T>) s.getObserver();
 
-                Set<Annotation> evenBindings = s.getEventQualifiers();
+                Set<Annotation> evenBindings = s.getQualifiers();
                 Annotation[] anns = new Annotation[evenBindings.size()];
                 anns = evenBindings.toArray(anns);
 
@@ -123,21 +113,27 @@
         removeObserver(observer, typeLiteral.getRawType(), annotations);
     }
 
-    public <T> Set<Observer<T>> resolveObservers(T event, Annotation... bindings)
+    public <T> Set<Observer<T>> resolveObservers(T event, Annotation... eventQualifiers)
     {
-
-        Set<ObserverWrapper<?>> resolvedSet = new HashSet<ObserverWrapper<?>>();
-        Set<Observer<T>> unres = new HashSet<Observer<T>>();
+        EventUtil.checkEventBindings(eventQualifiers);
 
         Class<T> eventType = (Class<T>) event.getClass();
-        
-        
+        //EventUtil.checkEventType(eventType);
+
+        Set<ObserverWrapper<T>> wrappers = filterByType(eventType);
+
+        wrappers = filterByQualifiers(wrappers, eventQualifiers);
+
+        return unwrap(wrappers);
+    }
+
+    private <T> Set<ObserverWrapper<T>> filterByType(Class<T> eventType)
+    {
+        Set<ObserverWrapper<T>> matching = new HashSet<ObserverWrapper<T>>();
+
         Set<Type> types = new HashSet<Type>();
         ClassUtil.setTypeHierarchy(types, eventType);
 
-        //EventUtil.checkEventType(eventType);
-        EventUtil.checkEventBindings(bindings);
-
         Set<Type> keySet = this.observers.keySet();
 
         for (Type type : keySet)
@@ -146,64 +142,94 @@
             {
                 if (ClassUtil.isAssignable(check, type))
                 {
-                    resolvedSet.addAll(this.observers.get(type));
+                    Set<ObserverWrapper<?>> wrappers = this.observers.get(type);
+
+                    for (ObserverWrapper<?> wrapper : wrappers)
+                    {
+                        matching.add((ObserverWrapper<T>) wrapper);
+                    }
                     break;
                 }
             }
         }
+        return matching;
+    }
+
+    private <T> Set<ObserverWrapper<T>> filterByQualifiers(Set<ObserverWrapper<T>> wrappers, Annotation... annotations)
+    {
+        Set<Annotation> eventQualifiers = toQualiferSet(annotations);
 
-        for (ObserverWrapper<?> impl : resolvedSet)
+        Set<ObserverWrapper<T>> matching = new HashSet<ObserverWrapper<T>>();
+
+        search: for (ObserverWrapper<T> wrapper : wrappers)
         {
-            if (impl.isObserverOfQualifiers(bindings))
+            Set<Annotation> qualifiers = wrapper.getQualifiers();
+
+            if (qualifiers.size() > eventQualifiers.size())
             {
-                unres.add(((ObserverWrapper<T>) impl).getObserver());
+                continue;
             }
-        }
 
-        return unres;
+            for (Annotation qualifier : qualifiers)
+            {
+                if (!eventQualifiers.contains(qualifier))
+                {
+                    continue search;
+                }
+            }
+
+            matching.add(wrapper);
+        }
+        
+        return matching;
     }
 
-    public void fireEvent(Object event, Annotation... bindings)
+    private <T> Set<Observer<T>> unwrap(Set<ObserverWrapper<T>> wrappers)
+    {
+        Set<Observer<T>> observers = new HashSet<Observer<T>>();
+
+        for (ObserverWrapper<T> wrapper : wrappers)
+        {
+            observers.add(wrapper.getObserver());
+        }
+
+        return observers;
+    }
+    
+    public void fireEvent(Object event, Annotation... qualifiers)
     {
         Transaction transaction = transactionService.getTransaction();
 
-        Set<Observer<Object>> observers = resolveObservers(event, bindings);
+        Set<Observer<Object>> observers = resolveObservers(event, qualifiers);
 
-        TransactionalNotifier transNotifier = null;
         for (Observer<Object> observer: observers)
         {
             try
             {
                 if (transaction != null && isTransactional(observer))
                 {
-
-                    // TODO: we only need to register once
-                    transaction.registerSynchronization(this);
-
-                    if (transNotifier == null)
-                    {
-                        transNotifier = new TransactionalNotifier(event);
-                        this.transactionSet.add(transNotifier);
-                    }
-
-                    // Register for transaction
                     BeanObserverImpl<Object> beanObserver = (BeanObserverImpl<Object>) observer;
+
                     TransactionalObserverType type = beanObserver.getType();
+
                     if (type.equals(TransactionalObserverType.AFTER_TRANSACTION_COMPLETION))
                     {
-                        transNotifier.addAfterCompletionObserver(observer);
+                        transaction.registerSynchronization(new AfterCompletion(observer, event));
                     }
                     else if (type.equals(TransactionalObserverType.AFTER_TRANSACTION_SUCCESS))
                     {
-                        transNotifier.addAfterCompletionSuccessObserver(observer);
+                        transaction.registerSynchronization(new AfterCompletionSuccess(observer, event));
                     }
                     else if (type.equals(TransactionalObserverType.AFTER_TRANSACTION_FAILURE))
                     {
-                        transNotifier.addAfterCompletionFailureObserver(observer);
+                        transaction.registerSynchronization(new AfterCompletionFailure(observer, event));
                     }
                     else if (type.equals(TransactionalObserverType.BEFORE_TRANSACTION_COMPLETION))
                     {
-                        transNotifier.addBeforeCompletionObserver(observer);
+                        transaction.registerSynchronization(new BeforeCompletion(observer, event));
+                    }
+                    else {
+                        throw new IllegalStateException("TransactionalObserverType not supported: " + type);
                     }
                 }
                 else
@@ -261,7 +287,7 @@
         {
             Observes observes = AnnotationUtil.getMethodFirstParameterAnnotation(observableMethod, Observes.class);
 
-            Annotation[] bindingTypes = AnnotationUtil.getMethodFirstParameterQualifierWithGivenAnnotation(observableMethod, Observes.class);
+            Annotation[] qualifiers = AnnotationUtil.getMethodFirstParameterQualifierWithGivenAnnotation(observableMethod, Observes.class);
 
             boolean ifExist = false;
 
@@ -276,56 +302,146 @@
 
             Class<T> clazz = (Class<T>) AnnotationUtil.getMethodFirstParameterTypeClazzWithAnnotation(observableMethod, Observes.class);
 
-            addObserver(observer, ifExist, type, clazz, bindingTypes);
+            addObserver(observer, clazz, qualifiers);
         }
 
     }
 
-    public void afterCompletion(int status)
+    private static Set<Annotation> toQualiferSet(Annotation... qualifiers)
     {
-        try
-        {
+        Set<Annotation> set = ArrayUtil.asSet(qualifiers);
 
-            for (TransactionalNotifier notifier : this.transactionSet)
+        return pruneDefault(set);
+    }
+    
+    private static Set<Annotation> pruneDefault(Set<Annotation> set) {
+        Iterator<Annotation> iterator = set.iterator();
+        while (iterator.hasNext()) {
+            if (iterator.next() instanceof Default)
             {
-                notifier.notifyAfterCompletion();
+                iterator.remove();
+            }
+        }
+        return set;
+    }
 
-                if (status == Status.STATUS_COMMITTED)
-                {
-                    notifier.notifyAfterCompletionSuccess();
+    /**
+     * Wrapper around the {@link javax.enterprise.event.Observer} instance.
+     *
+     * @param <T> generic event type
+     */
+    private static class ObserverWrapper<T>
+    {
+        /** Event qualifiers appearing on the parameter */
+        private final Set<Annotation> qualifiers;
 
-                }
-                else if (status == Status.STATUS_ROLLEDBACK)
-                {
-                    notifier.notifyAfterCompletionFailure();
-                }
+        /**Wrapped observer instance*/
+        private final Observer<T> observer;
+
+        public ObserverWrapper(Observer<T> observer, Annotation... qualifiers)
+        {
+            this.qualifiers = toQualiferSet(qualifiers);
+            this.qualifiers.remove(Default.class);
+            this.observer = observer;
+        }
+
+        public Set<Annotation> getQualifiers()
+        {
+            return this.qualifiers;
+        }
+
+        public Observer<T> getObserver()
+        {
+            return observer;
+        }
+    }
+
+    private static class AbstractSynchronization<T> implements Synchronization {
+
+        private final Observer<T> observer;
+        private final T event;
+
+        public AbstractSynchronization(Observer<T> observer, T event)
+        {
+            this.observer = observer;
+            this.event = event;
+        }
+
+        public void beforeCompletion()
+        {
+        }
+
+        public void afterCompletion(int i)
+        {
+        }
+
+        public void notifyObserver() {
+            try
+            {
+                observer.notify(event);
+            }
+            catch (Exception e)
+            {
+                logger.error("Exception is occured in the transactional observer ", e);
             }
+        }
+    }
+
+    private static class BeforeCompletion extends AbstractSynchronization {
+        private BeforeCompletion(Observer observer, Object event)
+        {
+            super(observer, event);
+        }
 
+        @Override
+        public void beforeCompletion()
+        {
+            notifyObserver();
         }
-        catch (Exception e)
+    }
+
+    private static class AfterCompletion extends AbstractSynchronization {
+        private AfterCompletion(Observer observer, Object event)
         {
-            logger.error("Exception is occured in the transational observer ", e);
+            super(observer, event);
         }
-        finally
+
+        @Override
+        public void afterCompletion(int i)
         {
-            this.transactionSet.clear();
+            notifyObserver();
         }
     }
 
-    public void beforeCompletion()
-    {
-        // Call @BeforeTransactionCompletion
-        try
+    private static class AfterCompletionSuccess extends AbstractSynchronization {
+        private AfterCompletionSuccess(Observer observer, Object event)
+        {
+            super(observer, event);
+        }
+
+        @Override
+        public void afterCompletion(int i)
         {
-            for (TransactionalNotifier notifier : this.transactionSet)
+            if (i == Status.STATUS_COMMITTED)
             {
-                notifier.notifyBeforeCompletion();
+                notifyObserver();
             }
+        }
+    }
 
+    private static class AfterCompletionFailure extends AbstractSynchronization {
+        private AfterCompletionFailure(Observer observer, Object event)
+        {
+            super(observer, event);
         }
-        catch (Exception e)
+
+        @Override
+        public void afterCompletion(int i)
         {
-            logger.error("Exception is occured in the transational observer ", e);
+            if (i != Status.STATUS_COMMITTED)
+            {
+                notifyObserver();
+            }
         }
     }
 }
\ No newline at end of file

Modified: incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/util/ArrayUtil.java
URL: http://svn.apache.org/viewvc/incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/util/ArrayUtil.java?rev=822570&r1=822569&r2=822570&view=diff
==============================================================================
--- incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/util/ArrayUtil.java (original)
+++ incubator/openwebbeans/trunk/webbeans-impl/src/main/java/org/apache/webbeans/util/ArrayUtil.java Wed Oct  7 03:10:50 2009
@@ -13,6 +13,9 @@
  */
 package org.apache.webbeans.util;
 
+import java.util.Set;
+import java.util.HashSet;
+
 public final class ArrayUtil
 {
 
@@ -83,4 +86,15 @@
         return true;
     }
 
+    public static <T> Set<T> asSet(T... items)
+    {
+        Set<T> set = new HashSet<T>();
+
+        for(T item : items)
+        {
+            set.add(item);
+        }
+
+        return set;
+    }
 }

Modified: incubator/openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/unittests/portable/events/ExtensionTest.java
URL: http://svn.apache.org/viewvc/incubator/openwebbeans/trunk/webbeans-impl/src/test/java/org/apache/webbeans/test/unittests/portable/events/ExtensionTest.java?rev=822570&r1=822569&r2=822570&view=diff
==============================================================================
    (empty)