You are viewing a plain text version of this content. The canonical link for it is here.
Posted to muse-commits@ws.apache.org by da...@apache.org on 2007/01/18 19:26:40 UTC

svn commit: r497524 - /webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java

Author: danj
Date: Thu Jan 18 10:26:39 2007
New Revision: 497524

URL: http://svn.apache.org/viewvc?view=rev&rev=497524
Log:
Added concept of listeners-by-topic to avoid potential performance problems documented here:

http://marc.theaimsgroup.com/?t=116803718800001&r=1&w=2


Modified:
    webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java

Modified: webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java?view=diff&rev=497524&r1=497523&r2=497524
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java (original)
+++ webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java Thu Jan 18 10:26:39 2007
@@ -20,14 +20,19 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
 
 import org.apache.muse.core.AbstractCapability;
 import org.apache.muse.core.serializer.SerializerRegistry;
 import org.apache.muse.util.LoggingUtils;
+import org.apache.muse.util.MultiMap;
 import org.apache.muse.ws.addressing.soap.SoapFault;
 import org.apache.muse.ws.notification.NotificationConsumer;
 import org.apache.muse.ws.notification.NotificationMessage;
 import org.apache.muse.ws.notification.NotificationMessageListener;
+import org.apache.muse.ws.notification.TopicListener;
 
 /**
  *
@@ -43,20 +48,40 @@
     extends AbstractCapability implements NotificationConsumer
 {    
     //
-    // Event consumers, ordered by WS-N SimpleTopic
+    // listeners that can be applied to any message
     //
     private Collection _messageListeners = new ArrayList();
     
+    //
+    // listeners that can be applied to a message with a specific topic
+    //
+    private Map _topicListeners = new MultiMap();
+    
     public void addMessageListener(NotificationMessageListener listener)
     {
         _messageListeners.add(listener);
     }
     
+    public void addTopicListener(TopicListener listener)
+    {
+        _topicListeners.put(listener.getTopic(), listener);
+    }
+    
     public Collection getMessageListeners()
     {
         return Collections.unmodifiableCollection(_messageListeners);
     }
     
+    public Collection getTopicListeners(QName topic)
+    {
+        Collection listeners = (Collection)_topicListeners.get(topic);
+        
+        if (listeners == null)
+            return Collections.EMPTY_LIST;
+        
+        return Collections.unmodifiableCollection(listeners);
+    }
+    
     public void initialize() 
         throws SoapFault
     {
@@ -88,6 +113,13 @@
         _messageListeners.remove(listener);
     }
     
+    public void removeTopicListener(TopicListener listener)
+    {
+        QName topic = listener.getTopic();
+        Collection listeners = (Collection)_topicListeners.get(topic);
+        listeners.remove(listener);
+    }
+    
     /**
      * 
      * NotifyThread is a simple thread that iterates over the collection of 
@@ -105,27 +137,66 @@
             _messages = messages;
         }
         
-        public void run()
+        private void processMessageListeners(NotificationMessage message)
         {
-            for (int n = 0; n < _messages.length; ++n)
-            {            
-                Iterator i = getMessageListeners().iterator();
+            Iterator i = getMessageListeners().iterator();
+            
+            while (i.hasNext())
+            {
+                NotificationMessageListener listener = (NotificationMessageListener)i.next();
                 
-                while (i.hasNext())
+                try
                 {
-                    NotificationMessageListener listener = (NotificationMessageListener)i.next();
-                    
-                    try
-                    {
-                        if (listener.accepts(_messages[n]))
-                            listener.process(_messages[n]);
-                    }
-                    
-                    catch (Throwable error)
-                    {
-                        LoggingUtils.logError(getLog(), error);
-                    }
+                    if (listener.accepts(message))
+                        listener.process(message);
                 }
+                
+                catch (Throwable error)
+                {
+                    LoggingUtils.logError(getLog(), error);
+                }
+            }
+        }
+        
+        private void processTopicListeners(NotificationMessage message)
+        {
+            QName topic = message.getTopic();
+            Iterator i = getTopicListeners(topic).iterator();
+            
+            while (i.hasNext())
+            {
+                NotificationMessageListener listener = (NotificationMessageListener)i.next();
+                
+                try
+                {
+                    //
+                    // don't call accepts() - we assume that all listeners 
+                    // added for the topic want the message and require no 
+                    // further analysis
+                    //
+                    listener.process(message);
+                }
+                
+                catch (Throwable error)
+                {
+                    LoggingUtils.logError(getLog(), error);
+                }
+            }
+        }
+        
+        public void run()
+        {
+            for (int n = 0; n < _messages.length; ++n)
+            {
+                //
+                // if a topic is available, pass it along to the 
+                // topic listeners
+                //
+                if (_messages[n].getTopic() != null)
+                    processTopicListeners(_messages[n]);
+                
+                else
+                    processMessageListeners(_messages[n]);
             }
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: muse-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: muse-commits-help@ws.apache.org