You are viewing a plain text version of this content. The canonical link for it is here.
Posted to muse-commits@ws.apache.org by da...@apache.org on 2006/06/16 00:20:09 UTC

svn commit: r414696 [6/7] - in /webservices/muse/trunk/modules: muse-wsdm-muws-adv/ muse-wsdm-muws-adv/src-api/ muse-wsdm-muws-adv/src-api/org/ muse-wsdm-muws-adv/src-api/org/apache/ muse-wsdm-muws-adv/src-api/org/apache/muse/ muse-wsdm-muws-adv/src-ap...

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationConsumer.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,125 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.logging.Logger;
+
+import org.apache.muse.core.AbstractCapability;
+import org.apache.muse.core.serializer.SerializerRegistry;
+import org.apache.muse.util.LoggingUtils;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.NotificationConsumer;
+import org.apache.muse.ws.notification.NotificationMessage;
+import org.apache.muse.ws.notification.NotificationMessageListener;
+
+/**
+ *
+ * SimpleNotificationConsumer is Muse's default implementation of the 
+ * WS-Notification NotificationConsumer port type and the listener API 
+ * that augments it.
+ *
+ * @author Dan Jemiolo (danj)
+ *
+ */
+
+public class SimpleNotificationConsumer 
+    extends AbstractCapability implements NotificationConsumer
+{
+    //
+    // Event consumers, ordered by WS-N SimpleTopic
+    //
+    private Collection _messageListeners = new ArrayList();
+    
+    public void addMessageListener(NotificationMessageListener listener)
+    {
+        _messageListeners.add(listener);
+    }
+    
+    public Collection getMessageListeners()
+    {
+        return Collections.unmodifiableCollection(_messageListeners);
+    }
+    
+    public void initialize() 
+        throws SoapFault
+    {
+        super.initialize();
+        
+        SerializerRegistry registry = SerializerRegistry.getInstance();
+        registry.registerSerializer(NotificationMessage.class, new NotificationMessageSerializer());
+    }
+    
+    /**
+     * 
+     * {@inheritDoc}
+     * <br><br>
+     * This implementation keeps TopicConsumers that share a SimpleTopic in an 
+     * ordered list. If there are multiple consumers for a SimpleTopic, they 
+     * will be notified in the order that they were added to the collection.
+     * 
+     */
+    public void notify(NotificationMessage[] messages)
+    {
+        NotifyThread thread = new NotifyThread(messages);
+        thread.start();
+    }
+    
+    public void removeMessageListener(NotificationMessageListener listener)
+    {
+        _messageListeners.remove(listener);
+    }
+    
+    private class NotifyThread extends Thread
+    {
+        private NotificationMessage[] _messages = null;
+        
+        public NotifyThread(NotificationMessage[] messages)
+        {
+            _messages = messages;
+        }
+        
+        public void run()
+        {
+            for (int n = 0; n < _messages.length; ++n)
+            {            
+                Iterator i = getMessageListeners().iterator();
+                
+                while (i.hasNext())
+                {
+                    NotificationMessageListener listener = (NotificationMessageListener)i.next();
+                    
+                    try
+                    {
+                        if (listener.accepts(_messages[n]))
+                            listener.process(_messages[n]);
+                    }
+                    
+                    catch (Throwable error)
+                    {
+                        Logger log = getLog();
+                        LoggingUtils.logError(log, error); // FIXME: extract message
+                        log.info("The last listener faulted while processing the current message - continuing to other listeners.");
+                    }
+                }
+            }
+        }
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationMessage.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationMessage.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationMessage.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationMessage.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,324 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import org.apache.muse.core.serializer.Serializer;
+import org.apache.muse.core.serializer.SerializerRegistry;
+import org.apache.muse.util.messages.Messages;
+import org.apache.muse.util.messages.MessagesFactory;
+import org.apache.muse.util.xml.XmlSerializable;
+import org.apache.muse.util.xml.XmlUtils;
+import org.apache.muse.ws.addressing.EndpointReference;
+import org.apache.muse.ws.notification.NotificationMessage;
+import org.apache.muse.ws.notification.WsnConstants;
+import org.apache.muse.ws.notification.topics.WstConstants;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+
+/**
+ *
+ * SimpleNotificationMessage is a simple container that represents the WS-Notification 
+ * SimpleNotificationMessage type. It provides serialization capabilities for these 
+ * messages and validation of their contents.
+ *
+ * @author Dan Jemiolo (danj)
+ *
+ */
+
+public class SimpleNotificationMessage implements XmlSerializable, NotificationMessage
+{
+    private static Messages _MESSAGES = MessagesFactory.get(SimpleNotificationMessage.class);
+    
+    private Map _messageContent = new LinkedHashMap();
+    
+    private EndpointReference _producer = null;
+    
+    private EndpointReference _subscription = null;
+    
+    private QName _topicPath = null;
+    
+    /**
+     * 
+     * The default constructor provides no initialization, allowing users 
+     * to create messages in an 'aseembly line' system, where pieces of 
+     * the messages are added by different components (i.e., GLA, Muse) 
+     * before being published. 
+     *
+     */
+    public SimpleNotificationMessage() 
+    {
+        //
+        // no initialization tasks - this allows for assembly line-like 
+        // creation of messages
+        //
+    }
+    
+    /**
+     *
+     * @param root
+     *        A DOM Element representing a WS-N SimpleNotificationMessage
+     * 
+     * @throws SoapFault
+     *         <ul>
+     *         <li>If the XML does not conform to the SimpleNotificationMessage schema.</li>
+     *         </ul>
+     *
+     */
+    public SimpleNotificationMessage(Element root)
+        throws SoapFault
+    {
+        if (root == null)
+            throw new NullPointerException(_MESSAGES.get("NullMessageElement"));
+        
+        _topicPath = XmlUtils.getQNameFromChild(root, WsnConstants.TOPIC_QNAME);
+        
+        if (_topicPath == null)
+            throw new SoapFault(_MESSAGES.get("NoTopicPath"));
+        
+        Element producerXML = XmlUtils.getElement(root, WsnConstants.PRODUCER_QNAME);
+        
+        if (producerXML != null)
+            _producer = new EndpointReference(producerXML);
+        
+        Element messageXML = XmlUtils.getElement(root, WsnConstants.MESSAGE_QNAME);
+        
+        //
+        // we have to have a Message element, even if it's empty
+        //
+        if (messageXML == null)
+            throw new SoapFault(_MESSAGES.get("NoMessageContent"));
+        
+        Element[] children = XmlUtils.getAllElements(messageXML);
+        
+        for (int n = 0; n < children.length; ++n)
+            addMessageContent(children[n]);
+    }
+    
+    public SimpleNotificationMessage(QName topicPath)
+    {
+        _topicPath = topicPath;
+    }
+    
+    public void addMessageContent(Element content)
+    {
+        if (content == null)
+            throw new NullPointerException(_MESSAGES.get("NullMessageContent"));
+
+        QName name = XmlUtils.getElementQName(content);
+        _messageContent.put(name, content);
+    }
+    
+    /**
+     * 
+     * This method allows you to add message content that has not yet been 
+     * serialized into XML. The content object must be of a type that has 
+     * a Muse Serializer registered for it; these types include all of 
+     * the base/built-in types plus those that had serializers registered 
+     * for them in touchpoint.xml.
+     *
+     * @param content
+     * @param qname
+     * 
+     * @throws SoapFault
+     * 
+     * @see Serializer
+     *
+     */
+    public void addMessageContent(QName qname, Object content)
+        throws SoapFault
+    {
+        if (qname == null)
+            throw new NullPointerException(_MESSAGES.get("NullContentName"));
+
+        if (content == null)
+            throw new NullPointerException(_MESSAGES.get("NullMessageContent"));
+        
+        SerializerRegistry registry = SerializerRegistry.getInstance();
+        Serializer ser = registry.getSerializer(content.getClass());
+        Element xml = ser.toXML(content, qname);
+        
+        _messageContent.put(qname, xml);
+    }
+    
+    public Element getMessageContent(QName qname)
+    {
+        return (Element)_messageContent.get(qname);
+    }
+    
+    /**
+     * 
+     * This method allows you to read message content as a POJO rather than 
+     * XML. The content class type must be one that has a Muse Serializer 
+     * registered for it; these types include all of the base/built-in types 
+     * plus those that had serializers registered for them in touchpoint.xml.
+     *  
+     * @see Serializer
+     *
+     */
+    public Object getMessageContent(QName qname, Class type)
+        throws SoapFault
+    {
+        Element content = getMessageContent(qname);
+        
+        SerializerRegistry registry = SerializerRegistry.getInstance();
+        Serializer ser = registry.getSerializer(type);
+        
+        return ser.fromXML(content);
+    }
+    
+    public Collection getMessageContentNames()
+    {
+        return Collections.unmodifiableSet(_messageContent.keySet());
+    }
+    
+    public EndpointReference getProducerReference()
+    {
+        return _producer;
+    }
+    
+    public EndpointReference getSubscriptionReference()
+    {
+        return _subscription;
+    }
+    
+    public QName getTopic()
+    {
+        return _topicPath;
+    }
+    
+    /**
+     *
+     * @return The concrete expression dialect
+     * 
+     * @see WstConstants#CONCRETE_TOPIC_URI
+     *
+     */
+    public String getTopicDialect()
+    {
+        return WstConstants.CONCRETE_TOPIC_URI;
+    }
+    
+    public void setProducerReference(EndpointReference producer)
+    {
+        if (producer == null)
+            _producer = null;
+        
+        else
+            _producer = new EndpointReference(producer, WsnConstants.PRODUCER_QNAME);
+    }
+    
+    public void setSubscriptionReference(EndpointReference subscription)
+    {
+        _subscription = subscription;
+    }
+    
+    public void setTopic(QName topicPath)
+    {
+        if (topicPath == null)
+            throw new NullPointerException(_MESSAGES.get("NullTopicPath"));
+        
+        _topicPath = topicPath;
+    }
+    
+    public void setTopicDialect(String dialect)
+    {
+        if (!dialect.equals(WstConstants.CONCRETE_TOPIC_URI)) // FIXME: message
+            throw new UnsupportedOperationException("Invalid topic dialect");
+    }
+
+    public String toString()
+    {
+        return XmlUtils.toString(toXML(), false);
+    }
+
+    public Element toXML()
+    {
+        return toXML(XmlUtils.EMPTY_DOC);
+    }
+
+    public Element toXML(Document doc)
+    {
+        if (doc == null)
+            throw new NullPointerException(_MESSAGES.get("NullDocument"));
+        
+        Element root = XmlUtils.createElement(doc, WsnConstants.NOTIFICATION_MSG_QNAME);
+        
+        EndpointReference sub = getSubscriptionReference();
+        
+        if (sub != null)
+        {
+            Element eprXML = sub.toXML(doc);
+            XmlUtils.setElement(root, WsnConstants.SUBSCRIPTION_EPR_QNAME, eprXML);
+        }
+        
+        QName topicPath = getTopic();
+        
+        if (topicPath != null)
+        {
+            Element topic = XmlUtils.createElement(doc, WsnConstants.TOPIC_QNAME, topicPath);
+            topic.setAttribute(WsnConstants.DIALECT, getTopicDialect());
+            root.appendChild(topic);
+        }
+        
+        EndpointReference producer = getProducerReference();
+        
+        if (producer != null)
+        {
+            Element eprXML = producer.toXML(doc);
+            XmlUtils.setElement(root, WsnConstants.PRODUCER_QNAME, eprXML);
+        }
+        
+        Element message = XmlUtils.createElement(doc, WsnConstants.MESSAGE_QNAME);
+        root.appendChild(message);
+        
+        Iterator i = getMessageContentNames().iterator();
+        
+        while (i.hasNext())
+        {
+            QName name = (QName)i.next();
+            Element next = getMessageContent(name);
+            next = (Element)doc.importNode(next, true);
+            message.appendChild(next);
+        }
+        
+        //
+        // add the message's various namespaces to the root element 
+        // so we can easily query them with XPath 1.0
+        //
+        Map namespacesByPrefix = XmlUtils.getAllNamespaces(root);
+        i = namespacesByPrefix.keySet().iterator();
+        
+        while (i.hasNext())
+        {
+            String prefix = (String)i.next();
+            String namespace = (String)namespacesByPrefix.get(prefix);
+            XmlUtils.setNamespaceAttribute(root, prefix, namespace);
+        }
+        
+        return root;
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,484 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import org.apache.muse.core.Resource;
+import org.apache.muse.core.ResourceManager;
+import org.apache.muse.core.ResourceManagerListener;
+import org.apache.muse.core.routing.MessageHandler;
+import org.apache.muse.util.ReflectUtils;
+import org.apache.muse.util.messages.Messages;
+import org.apache.muse.util.messages.MessagesFactory;
+import org.apache.muse.util.xml.XmlUtils;
+import org.apache.muse.ws.addressing.EndpointReference;
+import org.apache.muse.ws.resource.basefaults.BaseFault;
+import org.apache.muse.ws.notification.*;
+import org.apache.muse.ws.notification.faults.*;
+import org.apache.muse.ws.notification.properties.*;
+import org.apache.muse.ws.notification.topics.*;
+import org.apache.muse.ws.notification.topics.impl.*;
+import org.apache.muse.ws.resource.WsResource;
+import org.apache.muse.ws.resource.impl.AbstractWsResourceCapability;
+import org.apache.muse.ws.resource.lifetime.ScheduledTermination;
+import org.apache.muse.ws.resource.lifetime.WsrlConstants;
+import org.apache.muse.ws.resource.properties.ResourcePropertyCollection;
+import org.apache.muse.ws.resource.properties.WsrpConstants;
+import org.apache.muse.ws.resource.properties.listeners.PropertyChangeListener;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+
+/**
+ *
+ * SimpleNotificationProducer is Muse's default implementation of the 
+ * WS-Notification NotificationProducer port type.
+ * <br><br>
+ * Resources that implement this port type should be deployed with the 
+ * {@linkplain SubscriptionManager SubscriptionManager} type as well; the subscription 
+ * resources cannot be managed without this endpoint.
+ *
+ * @author Dan Jemiolo (danj)
+ *
+ */
+
+public class SimpleNotificationProducer 
+    extends AbstractWsResourceCapability implements NotificationProducer, ResourceManagerListener
+{
+    //
+    // Used to lookup all exception messages
+    //
+    private static Messages _MESSAGES = 
+        MessagesFactory.get(SimpleNotificationProducer.class);
+    
+    private static final String _TOPIC_SET_PARAM = "topic-set";
+    
+    private Set _allTopicNames = new HashSet();
+    
+    private ChangeNotificationListenerFactory _listenerFactory = null;
+    
+    private String _subscriptionPath = null;
+    
+    private Map _subscriptionsByEPR = new HashMap();
+    
+    private TopicSet _topicSet = new SimpleTopicSet();
+    
+    protected synchronized void addSubscription(Resource sub)
+    {
+        SubscriptionManager subMgr = 
+            (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
+        EndpointReference epr = sub.getEndpointReference();
+        _subscriptionsByEPR.put(epr, subMgr);
+    }
+    
+    public Topic addTopic(QName topicName)
+        throws BaseFault
+    {
+        String namespace = topicName.getNamespaceURI();
+        TopicNamespace topicSpace = getTopicNamespace(namespace);
+        
+        //
+        // make sure the property's namespace has a topic space
+        //
+        if (topicSpace == null)
+            topicSpace = addTopicNamespace(namespace);
+        
+        //
+        // if the property is new, we need to add a topic for it so 
+        // clients can subscribe to change events
+        //
+        String localName = topicName.getLocalPart();
+        Topic topic = new SimpleTopic(localName, topicSpace);
+        topicSpace.addTopic(topic);
+        
+        return topic;
+    }
+    
+    protected void addTopics(Element topicTree, 
+                             String namespace, 
+                             String prefix) 
+        throws BaseFault
+    {
+        String name = topicTree.getLocalName();
+        addTopic(new QName(namespace, name, prefix));
+        
+        Element[] children = XmlUtils.getAllElements(topicTree);
+        
+        for (int n = 0; n < children.length; ++n)
+            addTopics(children[n], namespace, prefix);
+    }
+    
+    public TopicNamespace addTopicNamespace(String namespace)
+        throws BaseFault
+    {
+        TopicNamespace topics = new SimpleTopicNamespace(namespace);
+        topics.setName(WsrpConstants.TOPIC_SPACE_NAME);
+        
+        _topicSet.addTopicNamespace(topics);
+        
+        return topics;
+    }
+    
+    protected ChangeNotificationListenerFactory createNotificationListenerFactory()
+    {
+        return new WsrpNotificationListenerFactory();
+    }
+    
+    protected MessageHandler createSubscribeHandler()
+    {
+        MessageHandler handler = new SubscribeHandler(getWsResource());
+        
+        Method method = ReflectUtils.getFirstMethod(getClass(), "subscribe");
+        handler.setMethod(method);
+        
+        return handler;
+    }
+    
+    protected void createTopicSetDocument(String topicSetFile)
+        throws BaseFault
+    {
+        Document doc = getEnvironment().getDocument(topicSetFile);
+        Element topicSetXML = XmlUtils.getFirstElement(doc);
+        Element[] topicTrees = XmlUtils.getAllElements(topicSetXML);
+        
+        for (int n = 0; n < topicTrees.length; ++n)
+        {
+            String uri = topicTrees[n].getNamespaceURI();
+            String prefix = topicTrees[n].getPrefix();
+            addTopics(topicTrees[n], uri, prefix);
+        }
+    }
+    
+    public NotificationMessage getCurrentMessage(QName topicPath)
+        throws NoCurrentMessageOnTopicFault, 
+               TopicNotSupportedFault
+    {
+        Topic topic = getTopic(topicPath);
+        
+        if (topic == null)
+        {
+            Object[] filler = { topicPath };
+            throw new TopicNotSupportedFault(_MESSAGES.get("TopicNotFound", filler));
+        }
+        
+        NotificationMessage message = topic.getCurrentMessage();
+        
+        if (message == null)
+        {
+            Object[] filler = { topicPath };
+            throw new NoCurrentMessageOnTopicFault(_MESSAGES.get("NoMessageAvailable", filler));
+        }
+        
+        return message;
+    }
+    
+    public boolean getFixedTopicSet()
+    {
+        return true;
+    }
+    
+    protected ChangeNotificationListenerFactory getNotificationListenerFactory()
+    {
+        return _listenerFactory;
+    }
+    
+    public QName[] getPropertyNames()
+    {
+        return PROPERTIES;
+    }
+    
+    protected String getSubscriptionContextPath()
+    {
+        return _subscriptionPath;
+    }
+    
+    protected synchronized Collection getSubscriptions()
+    {
+        return Collections.unmodifiableCollection(_subscriptionsByEPR.values());
+    }
+    
+    public Topic getTopic(QName topicName)
+    {
+        TopicNamespace topicSpace = getTopicNamespace(topicName.getNamespaceURI());
+        
+        if (topicSpace == null)
+            return null;
+        
+        return topicSpace.getTopic(topicName.getLocalPart());
+    }
+    
+    public QName[] getTopicExpression()
+    {
+        QName[] array = new QName[_allTopicNames.size()];
+        return (QName[])_allTopicNames.toArray(array);
+    }
+    
+    public URI[] getTopicExpressionDialect()
+    {
+        return new URI[]{ URI.create(WstConstants.CONCRETE_TOPIC_URI) };
+    }
+    
+    public TopicNamespace getTopicNamespace(String namespace)
+    {
+        return getTopicSet().getTopicNamespace(namespace);
+    }
+
+    public TopicSet getTopicSet()
+    {
+        return _topicSet;
+    }
+
+    public boolean hasTopic(QName topicName)
+    {
+        return _allTopicNames.contains(topicName);
+    }
+    
+    public void initialize()
+        throws SoapFault
+    {
+        super.initialize();
+        
+        _listenerFactory = createNotificationListenerFactory();
+        
+        setMessageHandler(createSubscribeHandler());
+        
+        //
+        // make sure we're exposing the subscription endpoint so that 
+        // clients can manage subscriptions/consumers
+        //
+        WsResource resource = getWsResource();
+        ResourceManager manager = resource.getResourceManager();
+        _subscriptionPath = manager.getResourceContextPath(SubscriptionManager.class);
+
+        if (_subscriptionPath == null)
+            throw new RuntimeException(_MESSAGES.get("NoSubscriptionManager"));
+        
+        //
+        // make sure we can listen for new subscriptions/destructions
+        //
+        manager.addListener(this);
+        
+        //
+        // if there are any user-defined topic spaces, find those files 
+        // and make sure they exist
+        //
+        String topicSetFile = getInitializationParameter(_TOPIC_SET_PARAM);
+        
+        if (topicSetFile != null)
+            createTopicSetDocument(topicSetFile);
+    }
+    
+    public void initializeCompleted()
+        throws SoapFault
+    {
+        super.initializeCompleted();
+        
+        ChangeNotificationListenerFactory factory = getNotificationListenerFactory();
+        
+        WsResource resource = getWsResource();
+        ResourcePropertyCollection props = resource.getPropertyCollection();
+        Iterator i = props.getPropertyNames().iterator();
+        
+        while (i.hasNext())
+        {
+            QName name = (QName)i.next();
+            
+            addTopic(name);
+            _allTopicNames.add(name);
+            
+            PropertyChangeListener listener = factory.newInstance(name, resource);
+            props.addChangeListener(listener);
+        }
+        
+        if (resource.hasCapability(WsrlConstants.IMMEDIATE_TERMINATION_URI) || 
+            resource.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
+        {
+            addTopic(WsrlConstants.TERMINATION_TOPIC_QNAME);
+            _allTopicNames.add(WsrlConstants.TERMINATION_TOPIC_QNAME);
+        }
+    }
+    
+    public void prepareShutdown() 
+        throws SoapFault
+    {
+        Topic topic = getTopic(WsrlConstants.TERMINATION_TOPIC_QNAME);
+        
+        //
+        // if WSRL is present, send a termination notification to subscribers 
+        // before all subscriptions are destroyed
+        //
+        if (topic != null)
+        {
+            NotificationMessage msg = new SimpleNotificationMessage(WsrlConstants.TERMINATION_TOPIC_QNAME);
+            msg.setProducerReference(getWsResource().getEndpointReference());
+            
+            Element payload = XmlUtils.createElement(WsrlConstants.NOTIFICATION_QNAME);
+            XmlUtils.setElement(payload, WsrlConstants.TERMINATION_TIME_QNAME, new Date());
+            msg.addMessageContent(payload);
+            
+            publish(msg);
+        }
+        
+        super.prepareShutdown();
+    }
+    
+    public synchronized void publish(NotificationMessage message)
+        throws SoapFault
+    {
+        Iterator i = getSubscriptions().iterator();
+        
+        while (i.hasNext())
+        {
+            SubscriptionManager sub = (SubscriptionManager)i.next();
+            sub.publish(message);
+        }
+    }
+    
+    protected synchronized void removeSubscription(EndpointReference epr)
+    {
+        _subscriptionsByEPR.remove(epr);
+    }
+
+    public void resourceAdded(EndpointReference epr, Resource resource)
+    {
+        //
+        // this call is synchronized, so if we're publishing, the 
+        // collection will not be updated until afterwards
+        //
+        if (resource.hasCapability(WsnConstants.SUBSCRIPTION_MGR_URI))
+            addSubscription(resource);
+    }
+
+    public void resourceRemoved(EndpointReference epr)
+    {
+        //
+        // this call is synchronized, so if we're publishing, the 
+        // collection will not be updated until afterwards
+        //
+        removeSubscription(epr);
+    }
+
+    public WsResource subscribe(EndpointReference consumer, 
+                                Filter filter, 
+                                Date terminationTime, 
+                                Policy policy) 
+        throws TopicNotSupportedFault, 
+               UnacceptableInitialTerminationTimeFault,
+               SubscribeCreationFailedFault
+    {
+        if (consumer == null)
+            throw new NullPointerException(_MESSAGES.get("NullConsumerEPR"));
+        
+        if (filter == null)
+            filter = PublishAllMessagesFilter.getInstance();
+        
+        //
+        // HACK: ugh. topic filters are different from message 
+        //       pattern and properties filters because they 
+        //       rely on internal data structures (topics) that 
+        //       are more concretely defined than the other two 
+        //       filter types. it's a one-off. we have to check 
+        //       against the topic set of the resource, and there 
+        //       is no such equivalent task for other filter types. 
+        //       so we downcast.
+        //
+        if (filter instanceof TopicFilter)
+        {
+            TopicFilter topicFilter = (TopicFilter)filter;
+            QName topic = topicFilter.getTopic();
+            
+            if (hasTopic(topic))
+                throw new TopicNotSupportedFault("Topic not found: " + topic);
+        }
+        
+        WsResource producer = getWsResource();
+        ResourceManager manager = producer.getResourceManager();
+        
+        //
+        // create the resource to represent the subscription
+        //
+        String endpoint = getSubscriptionContextPath();
+        WsResource sub = null;
+        
+        try
+        {
+            sub = (WsResource)manager.createResource(endpoint);
+        }
+        
+        catch (SoapFault error)
+        {
+            throw new SubscribeCreationFailedFault(error);
+        }
+        
+        SubscriptionManager subMgr = 
+            (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
+        
+        EndpointReference producerEPR = producer.getEndpointReference();
+        subMgr.setProducerReference(producerEPR);
+        
+        subMgr.setConsumerReference(consumer);
+        subMgr.setFilter(filter);
+        subMgr.setSubscriptionPolicy(policy);
+        
+        try
+        {
+            sub.initialize();
+            manager.addResource(sub.getEndpointReference(), sub);
+        }
+        
+        catch (SoapFault error)
+        {
+            throw new SubscribeCreationFailedFault(error);            
+        }
+        
+        //
+        // set termination time AFTER we initialize so that we can rely 
+        // on the base Resource class' implementation of WS-RL (which 
+        // tries to apply the change immediately, rather than storing it 
+        // in a field and setting it during initialization)
+        //
+        if (sub.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
+        {
+            ScheduledTermination wsrl = (ScheduledTermination)sub.getCapability(WsrlConstants.SCHEDULED_TERMINATION_URI);
+            
+            try
+            {
+                wsrl.setTerminationTime(terminationTime);
+            }
+            
+            catch (BaseFault error)
+            {
+                throw new UnacceptableInitialTerminationTimeFault(error);
+            }
+        }
+        
+        return sub;
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPoint.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPoint.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPoint.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPoint.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,101 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import org.apache.muse.ws.addressing.EndpointReference;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.Filter;
+import org.apache.muse.ws.notification.NotificationConsumer;
+import org.apache.muse.ws.notification.NotificationMessage;
+import org.apache.muse.ws.notification.PullPoint;
+import org.apache.muse.ws.notification.PullPointDataStore;
+import org.apache.muse.ws.notification.WsnConstants;
+import org.apache.muse.ws.notification.faults.UnableToGetMessagesFault;
+import org.apache.muse.ws.resource.WsResource;
+import org.apache.muse.ws.resource.impl.AbstractWsResourceCapability;
+
+
+public class SimplePullPoint 
+    extends AbstractWsResourceCapability implements PullPoint
+{
+    private PullPointDataStore _dataStore = null;
+    
+    private WsResource _subscription = null;
+    
+    public boolean accepts(NotificationMessage message)
+    {
+        EndpointReference subEPR = message.getSubscriptionReference();
+        return subEPR != null && getSubscription().equals(subEPR);
+    }
+    
+    protected PullPointDataStore createDataStore()
+    {
+        return new SimplePullPointDataStore();
+    }
+
+    public PullPointDataStore getDataStore()
+    {
+        return _dataStore;
+    }
+    
+    public Filter getFilter()
+    {
+        return PublishAllMessagesFilter.getInstance();
+    }
+
+    public NotificationMessage[] getMessages(int maxNumber) 
+        throws UnableToGetMessagesFault
+    {
+        return getDataStore().getMessages(maxNumber);
+    }
+    
+    public WsResource getSubscription()
+    {
+        return _subscription;
+    }
+
+    public void initialize() 
+        throws SoapFault
+    {
+        super.initialize();
+        
+        _dataStore = createDataStore();
+        _dataStore.initialize();
+        
+        NotificationConsumer wsn = (NotificationConsumer)getResource().getCapability(WsnConstants.CONSUMER_URI);
+        wsn.addMessageListener(this);
+    }
+    
+    public void process(NotificationMessage message) 
+        throws SoapFault
+    {
+        getDataStore().addMessage(message);
+    }
+
+    public void setSubscription(WsResource subscription)
+    {
+        _subscription = subscription;
+    }
+
+    public void shutdown() 
+        throws SoapFault
+    {
+        getSubscription().shutdown();
+        getDataStore().shutdown();
+        super.shutdown();
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPointCreation.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPointCreation.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPointCreation.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPointCreation.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,100 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import org.apache.muse.core.ResourceManager;
+import org.apache.muse.ws.addressing.EndpointReference;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.Filter;
+import org.apache.muse.ws.notification.NotificationProducer;
+import org.apache.muse.ws.notification.PullPoint;
+import org.apache.muse.ws.notification.PullPointCreation;
+import org.apache.muse.ws.notification.WsnConstants;
+import org.apache.muse.ws.notification.faults.UnableToCreatePullPointFault;
+import org.apache.muse.ws.resource.WsResource;
+import org.apache.muse.ws.resource.basefaults.BaseFault;
+import org.apache.muse.ws.resource.impl.AbstractWsResourceCapability;
+
+
+public class SimplePullPointCreation 
+    extends AbstractWsResourceCapability implements PullPointCreation
+{
+    private String _pullPointPath = null;
+    
+    public EndpointReference createPullPoint() 
+        throws UnableToCreatePullPointFault
+    {
+        ResourceManager manager = getResource().getResourceManager();
+        
+        //
+        // create the resource to represent the subscription
+        //
+        String endpoint = getPullPointContextPath();
+        WsResource pullPoint = null;
+        
+        try
+        {
+            pullPoint = (WsResource)manager.createResource(endpoint);
+        }
+        
+        catch (SoapFault error)
+        {
+            throw new UnableToCreatePullPointFault(error);
+        }
+        
+        EndpointReference epr = pullPoint.getEndpointReference();
+        
+        PullPoint pullPointCap = (PullPoint)pullPoint.getCapability(WsnConstants.PULL_POINT_URI);
+        Filter filter = pullPointCap.getFilter();
+        
+        NotificationProducer wsn = (NotificationProducer)getResource().getCapability(WsnConstants.PRODUCER_URI);
+        WsResource sub = null;
+        
+        try
+        {
+            sub = wsn.subscribe(epr, filter, null, null);
+        }
+        
+        catch (BaseFault error)
+        {
+            throw new UnableToCreatePullPointFault(error);
+        }
+        
+        pullPointCap.setSubscription(sub);
+        
+        return epr;        
+    }
+    
+    protected String getPullPointContextPath()
+    {
+        return _pullPointPath;
+    }
+    
+    public void initialize() throws SoapFault
+    {
+        super.initialize();
+        
+        //
+        // find subscription manager so we can subscribe upon creation
+        //
+        ResourceManager manager = getResource().getResourceManager();
+        _pullPointPath = manager.getResourceContextPath(PullPoint.class);
+
+        if (_pullPointPath == null)
+            throw new RuntimeException("No PullPoint endpoint deployed");
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPointDataStore.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPointDataStore.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPointDataStore.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimplePullPointDataStore.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,82 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import org.apache.muse.ws.notification.NotificationMessage;
+import org.apache.muse.ws.notification.PullPointDataStore;
+
+public class SimplePullPointDataStore implements PullPointDataStore
+{
+    private boolean _hasBeenInitialized = false;
+    private boolean _hasBeenShutdown = false;
+        
+    private Set _messages = new LinkedHashSet();
+    
+    public synchronized void addMessage(NotificationMessage message)
+    {
+        _messages.add(message);
+    }
+
+    public synchronized NotificationMessage[] getMessages(int maxNumber)
+    {
+        int currentSize = _messages.size();
+        
+        if (currentSize < maxNumber)
+            maxNumber = currentSize;
+        
+        NotificationMessage[] results = new NotificationMessage[maxNumber];
+        Iterator i = _messages.iterator();
+        
+        for (int n = 0; n < maxNumber; ++n)
+        {
+            results[n] = (NotificationMessage)i.next();
+            i.remove();
+        }
+        
+        return results;
+    }
+
+    public boolean hasBeenInitialized()
+    {
+        return _hasBeenInitialized;
+    }
+
+    public boolean hasBeenShutdown()
+    {
+        return _hasBeenShutdown;
+    }
+
+    public void initialize()
+    {
+        //
+        // no additional initialization tasks needed
+        //
+        _hasBeenInitialized = true;
+    }
+
+    public void shutdown()
+    {
+        //
+        // no shutdown tasks - messages are not persisted
+        //
+        _hasBeenShutdown = true;
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleSubscriptionManager.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleSubscriptionManager.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleSubscriptionManager.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SimpleSubscriptionManager.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,273 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import java.util.Date;
+
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import org.apache.muse.core.Environment;
+import org.apache.muse.util.LoggingUtils;
+import org.apache.muse.util.messages.Messages;
+import org.apache.muse.util.messages.MessagesFactory;
+import org.apache.muse.util.xml.XmlUtils;
+import org.apache.muse.ws.addressing.EndpointReference;
+import org.apache.muse.ws.notification.Filter;
+import org.apache.muse.ws.notification.NotificationMessage;
+import org.apache.muse.ws.notification.Policy;
+import org.apache.muse.ws.notification.SubscriptionManager;
+import org.apache.muse.ws.notification.WsnConstants;
+import org.apache.muse.ws.notification.remote.NotificationConsumerClient;
+import org.apache.muse.ws.resource.impl.AbstractWsResourceCapability;
+import org.apache.muse.ws.resource.lifetime.WsrlConstants;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+
+/**
+ *
+ * SimpleSubscriptionManager is Muse's default implementation of the 
+ * SubscriptionManager resource type (WS-N's SubscriptionManager). This class is 
+ * exposed as a first-class resource to web service clients. It extends 
+ * the {@linkplain AbstractWsResourceCapability basic resource capability} 
+ * defined by Muse and adds support for the SubscriptionManager properties.
+ * <br><br>
+ * Please refer to {@linkplain SubscriptionManager this documentation} for notes 
+ * about the relationship between SimpleTopic and SimpleSubscriptionManager, and how 
+ * users should publish messages to subscribers.
+ * <br><br>
+ * <b>NOTE:</b> This class does not support the optional filtering 
+ * capabilities for subscribers (preconditions, selectors, or policy); 
+ * when Muse is upgraded to WS-N 1.3, the subscription interface will 
+ * be simplified and have a more robust filtering API, so users should 
+ * not make too much investment in the current filtering interface.
+ *
+ * @author Dan Jemiolo (danjemiolo)
+ *
+ */
+
+public class SimpleSubscriptionManager 
+    extends AbstractWsResourceCapability implements SubscriptionManager
+{
+    private static final String _TRACE_PARAM = "trace-notifications";
+    
+    //
+    // Used to lookup all exception messages
+    //
+    private static Messages _MESSAGES = MessagesFactory.get(SimpleSubscriptionManager.class);
+    
+    //
+    // The client that is used to publish messages to the consumer
+    //
+    private NotificationConsumerClient _client = null;
+    
+    private EndpointReference _consumer = null;
+    
+    //
+    // When was the subscription created? Right now!
+    //
+    private Date _creationTime = new Date();
+    
+    //
+    // The filter will approve of messages that are published
+    //
+    private Filter _filter = null;
+    
+    private boolean _isPaused = false;
+    
+    private EndpointReference _producer = null;
+
+    /**
+     * 
+     * <b>Plug-In Point:</b> This is a factory method that creates the 
+     * actual proxy used to send out notifications. It can be overridden 
+     * to provide a different implementation of the WS-N proxy class 
+     * without changing the implementation of subscription management.
+     *
+     * @return This implementation returns an instance of NotificationConsumerClient.
+     *
+     */
+    protected NotificationConsumerClient createConsumerClient()
+    {
+        EndpointReference consumer = getConsumerReference();
+        EndpointReference producer = getProducerReference();
+        Environment env = getEnvironment();
+        return new NotificationConsumerClient(consumer, producer, env);
+    }
+    
+    protected synchronized NotificationConsumerClient getConsumerClient()
+    {
+        return _client;
+    }
+    
+    public EndpointReference getConsumerReference()
+    {
+        return _consumer;
+    }
+    
+    public Date getCreationTime()
+    {
+        return _creationTime;
+    }
+    
+    public Filter getFilter()
+    {
+        return _filter;
+    }
+    
+    public EndpointReference getProducerReference()
+    {
+        return _producer;
+    }
+    
+    public QName[] getPropertyNames()
+    {
+        return PROPERTIES;
+    }
+    
+    /**
+     * 
+     * This implementation always returns null.
+     * 
+     */
+    public String getSelector()
+    {
+        return null;
+    }
+    
+    /**
+     * 
+     * This implementation always returns null.
+     * 
+     */
+    public Policy getSubscriptionPolicy()
+    {
+        return null;
+    }
+    
+    public void initialize()
+        throws SoapFault
+    {
+        super.initialize();
+        
+        if (getProducerReference() == null)
+            throw new IllegalStateException(_MESSAGES.get("NoProducerEPR"));
+        
+        if (getConsumerReference() == null)
+            throw new IllegalStateException(_MESSAGES.get("NoConsumerEPR"));
+
+        if (getFilter() == null)
+            throw new IllegalStateException(_MESSAGES.get("NoFilter"));
+        
+        if (!getResource().hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
+            throw new IllegalStateException("Subscriptions must support WSRL ScheduledTermination");
+        
+        boolean trace = false;
+        String traceString = getInitializationParameter(_TRACE_PARAM);
+        
+        if (traceString != null)
+            trace = Boolean.valueOf(traceString).booleanValue();
+
+        _client = createConsumerClient();
+        _client.setTrace(trace);
+    }
+    
+    public synchronized boolean isPaused()
+    {
+        return _isPaused;
+    }
+    
+    public synchronized void pauseSubscription()
+    {
+        _isPaused = true;
+    }
+    
+    public void publish(NotificationMessage message) 
+    {
+        if (isPaused() || !getFilter().accepts(message))
+            return;
+        
+        try
+        {
+            NotificationConsumerClient client = getConsumerClient();
+            client.notify(new NotificationMessage[]{ message });
+        }
+        
+        catch (SoapFault error)
+        {
+            LoggingUtils.logError(getLog(), error); // FIXME: extract message
+            getLog().info("The last message failed to publish - sending next message.");
+        }
+    }
+    
+    public synchronized void resumeSubscription()
+    {
+        _isPaused = false;
+    }
+    
+    public void setConsumerReference(EndpointReference consumer)
+    {
+        if (consumer == null)
+            throw new NullPointerException(_MESSAGES.get("NullConsumerEPR"));
+        
+        _consumer = consumer;
+    }
+    
+    public void setFilter(Filter filter)
+    {
+        _filter = filter;
+    }
+
+    public void setProducerReference(EndpointReference producer) 
+    {
+        if (producer == null)
+            throw new NullPointerException(_MESSAGES.get("NullProducerEPR"));
+        
+        _producer = producer;
+    }
+
+    /**
+     * 
+     * Logs a warning message that this feature is not supported.
+     * 
+     */
+    public void setSubscriptionPolicy(Policy policy)
+    {
+        Object[] filler = { policy };
+        getLog().warning(_MESSAGES.get("PolicyNotSupported", filler));
+    }
+
+    public Element toXML()
+    {
+        return toXML(XmlUtils.EMPTY_DOC);
+    }
+
+    public Element toXML(Document factory)
+    {
+        Element root = XmlUtils.createElement(factory, WsnConstants.SUBSCRIBE_RESPONSE_QNAME);
+        
+        EndpointReference epr = getWsResource().getEndpointReference();
+        
+        Element subXML = XmlUtils.createElement(factory, WsnConstants.SUBSCRIPTION_EPR_QNAME);
+        XmlUtils.moveSubTree(epr.toXML(), subXML);
+        
+        root.appendChild(subXML);
+        
+        return root;
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/Subscribe.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/Subscribe.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/Subscribe.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/Subscribe.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,163 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import java.text.ParseException;
+import java.util.Date;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import org.apache.muse.util.messages.Messages;
+import org.apache.muse.util.messages.MessagesFactory;
+import org.apache.muse.ws.addressing.EndpointReference;
+import org.apache.muse.ws.notification.Filter;
+import org.apache.muse.ws.notification.WsnConstants;
+import org.apache.muse.ws.notification.faults.*;
+import org.apache.muse.ws.notification.topics.WstConstants;
+import org.apache.muse.ws.resource.WsResource;
+import org.apache.muse.ws.resource.ext.faults.InvalidMessageFormatFault;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.util.xml.XmlSerializable;
+import org.apache.muse.util.xml.XmlUtils;
+
+/**
+ *
+ * Subscribe is a serializer/deserializer for the WS-Notification Subscribe 
+ * operation's request content.
+ *
+ * @author Dan Jemiolo (danj)
+ * 
+ */
+
+public class Subscribe implements XmlSerializable
+{
+    private static Messages _MESSAGES = MessagesFactory.get(Subscribe.class);
+    
+    private EndpointReference _consumer = null;
+    
+    private Filter _filter = null;
+
+    private Date _terminationTime = null;
+    
+    public Subscribe(Element xml, WsResource resource) 
+        throws InvalidMessageFormatFault, 
+               InvalidFilterFault, 
+               TopicExpressionDialectUnknownFault, 
+               InvalidTopicExpressionFault, 
+               InvalidProducerPropertiesExpressionFault, 
+               InvalidMessageContentExpressionFault
+    {
+        Element eprXML = XmlUtils.getElement(xml, WsnConstants.CONSUMER_QNAME);
+        
+        if (eprXML == null)
+            throw new NullPointerException(_MESSAGES.get("NullConsumerElement"));
+        
+        try
+        {
+            _consumer = new EndpointReference(eprXML);
+        }
+        
+        catch (SoapFault error)
+        {
+            throw new InvalidMessageFormatFault(error);
+        }
+        
+        Element filterXML = XmlUtils.getElement(xml, WsnConstants.FILTER_QNAME);
+        FilterFactory factory = FilterFactory.getInstance();
+        _filter = factory.newInstance(filterXML, resource);
+        
+        Element timeXML = XmlUtils.getElement(xml, WsnConstants.INIT_TERMINATION_TIME_QNAME);
+        
+        try
+        {
+            if (timeXML != null)
+                _terminationTime = XmlUtils.getDate(timeXML);
+        }
+        
+        catch (ParseException error)
+        {
+            Object[] filler = { error.getMessage() };
+            throw new InvalidMessageFormatFault(_MESSAGES.get("InvalidTerminationTime", filler));
+        }
+    }
+    
+    public Subscribe(EndpointReference consumer, Filter filter, Date terminationTime)
+    {
+        if (consumer == null)
+            throw new NullPointerException(_MESSAGES.get("NullConsumerEPR"));
+        
+        _consumer = new EndpointReference(consumer, WsnConstants.CONSUMER_QNAME);
+        _filter = filter;
+        _terminationTime = terminationTime;
+    }
+    
+    public EndpointReference getConsumerReference()
+    {
+        return _consumer;
+    }
+    
+    protected String getDialect(String name)
+    {
+        return name.indexOf('/') >= 0 ? WstConstants.CONCRETE_TOPIC_URI : WstConstants.SIMPLE_TOPIC_URI;
+    }
+    
+    public Date getTerminationTime()
+    {
+        return _terminationTime;
+    }
+    
+    public Filter getFilter()
+    {
+        return _filter;
+    }
+    
+    public Object[] toArray()
+    {
+        return new Object[]{ 
+            getConsumerReference(), getFilter(), getTerminationTime(), null 
+        };
+    }
+    
+    public Element toXML()
+    {
+        return toXML(XmlUtils.EMPTY_DOC);
+    }
+    
+    public Element toXML(Document doc)
+    {
+        if (doc == null)
+            throw new NullPointerException(_MESSAGES.get("NullDocument"));
+        
+        Element root = XmlUtils.createElement(doc, WsnConstants.SUBSCRIBE_QNAME);
+        
+        EndpointReference consumer = getConsumerReference();
+        XmlUtils.setElement(root, WsnConstants.CONSUMER_QNAME, consumer);
+        
+        Filter filter = getFilter();
+        
+        if (filter != null)
+            XmlUtils.setElement(root, WsnConstants.FILTER_QNAME, filter);
+        
+        Date termination = getTerminationTime();
+        
+        if (termination != null)
+            XmlUtils.setElement(root, WsnConstants.INIT_TERMINATION_TIME_QNAME, termination);
+        
+        return root;
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SubscribeHandler.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SubscribeHandler.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SubscribeHandler.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,67 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import java.util.Date;
+
+import org.w3c.dom.Element;
+
+import org.apache.muse.core.routing.AbstractMessageHandler;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.WsnConstants;
+import org.apache.muse.ws.resource.WsResource;
+import org.apache.muse.ws.resource.lifetime.ScheduledTermination;
+import org.apache.muse.ws.resource.lifetime.WsrlConstants;
+
+/**
+ * SubscribeHandler is the parser for the WS-Notification Subscribe operation.
+ * 
+ * @author Dan Jemiolo (danj)
+ * 
+ * @see org.apache.muse.ws.notification.NotificationProducer
+ * 
+ */
+
+public class SubscribeHandler extends AbstractMessageHandler
+{
+    private WsResource _resource = null;
+    
+    public SubscribeHandler(WsResource resource)
+    {
+        super(WsnConstants.SUBSCRIBE_QNAME);
+        _resource = resource;
+    }
+    
+    public Object[] fromXML(Element xml) 
+        throws SoapFault
+    {
+        Subscribe sub = new Subscribe(xml, _resource);
+        return sub.toArray();
+    }
+    
+    public Element toXML(Object result)
+        throws SoapFault
+    {
+        WsResource sub = (WsResource)result;
+        
+        ScheduledTermination wsrl = (ScheduledTermination)sub.getCapability(WsrlConstants.SCHEDULED_TERMINATION_URI);
+        Date time = wsrl.getTerminationTime();
+        
+        SubscribeResponse response = new SubscribeResponse(sub, time);
+        return response.toXML();
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SubscribeResponse.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SubscribeResponse.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SubscribeResponse.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/SubscribeResponse.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,135 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import java.text.ParseException;
+import java.util.Date;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import org.apache.muse.util.messages.Messages;
+import org.apache.muse.util.messages.MessagesFactory;
+import org.apache.muse.util.xml.XmlSerializable;
+import org.apache.muse.util.xml.XmlUtils;
+import org.apache.muse.util.xml.XsdUtils;
+import org.apache.muse.ws.addressing.EndpointReference;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.WsnConstants;
+import org.apache.muse.ws.resource.WsResource;
+import org.apache.muse.ws.resource.ext.faults.InvalidMessageFormatFault;
+
+/**
+ *
+ * SubscribeResponse is a serializer/deserializer for the WS-Notification 
+ * Subscribe operation's response content.
+ *
+ * @author Dan Jemiolo (danj)
+ * 
+ */
+
+public class SubscribeResponse implements XmlSerializable
+{
+    private static Messages _MESSAGES = MessagesFactory.get(SubscribeResponse.class);
+    
+    private Date _currentTime = new Date();
+    
+    private EndpointReference _subscriptionEPR = null;
+    
+    private Date _terminationTime = null;
+    
+    public SubscribeResponse(Element xml) 
+        throws InvalidMessageFormatFault
+    {
+        Element eprXML = XmlUtils.getElement(xml, WsnConstants.SUBSCRIPTION_EPR_QNAME);
+        
+        if (eprXML == null)
+            throw new InvalidMessageFormatFault(_MESSAGES.get("NoSubscriptionEPR"));
+        
+        try
+        {
+            _subscriptionEPR = new EndpointReference(eprXML);
+        }
+        
+        catch (SoapFault error)
+        {
+            throw new InvalidMessageFormatFault(error);
+        }
+        
+        String currentString = XmlUtils.getElementText(xml, WsnConstants.CURRENT_TIME_QNAME);
+        String terminationString = XmlUtils.getElementText(xml, WsnConstants.TERMINATION_TIME_QNAME);
+        
+        try
+        {
+            if (currentString != null)
+                _currentTime = XsdUtils.getLocalTime(currentString);
+            
+            if (terminationString != null)
+                _terminationTime = XsdUtils.getLocalTime(terminationString);
+        }
+        
+        catch (ParseException error)
+        {
+            throw new InvalidMessageFormatFault(error);
+        }
+    }
+    
+    public SubscribeResponse(WsResource subscription, Date terminationTime)
+    {
+        _subscriptionEPR = subscription.getEndpointReference();
+        _terminationTime = terminationTime;
+    }
+    
+    public Date getCurrentTime()
+    {
+        return _currentTime;
+    }
+    
+    public EndpointReference getSubscriptionReference()
+    {
+        return _subscriptionEPR;
+    }
+    
+    public Date getTerminationTime()
+    {
+        return _terminationTime;
+    }
+    
+    public Element toXML()
+    {
+        return toXML(XmlUtils.EMPTY_DOC);
+    }
+    
+    public Element toXML(Document doc)
+    {
+        Element root = XmlUtils.createElement(doc, WsnConstants.SUBSCRIBE_RESPONSE_QNAME);
+        
+        Element eprXML = getSubscriptionReference().toXML();
+        XmlUtils.setElement(root, WsnConstants.SUBSCRIPTION_EPR_QNAME, eprXML);
+        
+        Date currentTime = getCurrentTime();
+        Date termTime = getTerminationTime();
+        
+        if (currentTime != null)
+            XmlUtils.setElement(root, WsnConstants.CURRENT_TIME_QNAME, currentTime);
+        
+        if (termTime != null)
+            XmlUtils.setElement(root, WsnConstants.TERMINATION_TIME_QNAME, termTime);
+        
+        return root;
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/TopicFilter.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/TopicFilter.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/TopicFilter.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/impl/TopicFilter.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,88 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.impl;
+
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import org.apache.muse.util.xml.XmlUtils;
+import org.apache.muse.ws.notification.Filter;
+import org.apache.muse.ws.notification.NotificationMessage;
+import org.apache.muse.ws.notification.WsnConstants;
+import org.apache.muse.ws.notification.faults.InvalidTopicExpressionFault;
+import org.apache.muse.ws.notification.faults.TopicExpressionDialectUnknownFault;
+import org.apache.muse.ws.notification.topics.WstConstants;
+import org.apache.muse.ws.notification.topics.impl.ConcretePathExpression;
+
+public class TopicFilter implements Filter
+{
+    private QName _topicName = null;
+    
+    public TopicFilter(QName topicName) 
+        throws TopicExpressionDialectUnknownFault, 
+               InvalidTopicExpressionFault
+    {
+        this(topicName, WstConstants.CONCRETE_TOPIC_URI);
+    }
+    
+    public TopicFilter(QName topicName, String dialect) 
+        throws TopicExpressionDialectUnknownFault, 
+               InvalidTopicExpressionFault
+    {
+        if (!dialect.equals(WstConstants.SIMPLE_TOPIC_URI) && 
+            !dialect.equals(WstConstants.CONCRETE_TOPIC_URI))
+            throw new TopicExpressionDialectUnknownFault("Invalid dialect: " + dialect);
+        
+        ConcretePathExpression.validateTopicPath(topicName);
+        
+        _topicName = topicName;
+    }
+    
+    public boolean accepts(NotificationMessage message)
+    {
+        QName messageTopic = message.getTopic();
+        return messageTopic != null && messageTopic.equals(_topicName);
+    }
+    
+    public QName getTopic()
+    {
+        return _topicName;
+    }
+    
+    public String toString()
+    {
+        return XmlUtils.toString(toXML(), false);
+    }
+
+    public Element toXML()
+    {
+        return toXML(XmlUtils.EMPTY_DOC);
+    }
+
+    public Element toXML(Document doc)
+    {
+        Element filter = XmlUtils.createElement(doc, WsnConstants.FILTER_QNAME);
+        
+        Element topic = XmlUtils.createElement(doc, WsnConstants.TOPIC_EXPRESSION_QNAME, getTopic());
+        topic.setAttribute(WsnConstants.DIALECT, WstConstants.CONCRETE_TOPIC_URI);
+        filter.appendChild(topic);
+        
+        return filter;
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/properties/AbstractMessageListener.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/properties/AbstractMessageListener.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/properties/AbstractMessageListener.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/properties/AbstractMessageListener.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,190 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.properties;
+
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Element;
+
+import org.apache.muse.util.messages.Messages;
+import org.apache.muse.util.messages.MessagesFactory;
+import org.apache.muse.util.xml.XmlUtils;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.NotificationMessage;
+import org.apache.muse.ws.notification.NotificationMessageListener;
+import org.apache.muse.ws.resource.ext.faults.InvalidMessageFormatFault;
+import org.apache.muse.ws.resource.properties.WsrpConstants;
+
+/**
+ *
+ * AbstractMessageListener is an abstract convenience class for 
+ * {@linkplain NotificationMessageListener TopicConsumers} that are subscribed to WS-RP 
+ * property change Topics. Such consumer can expect a WS-RP change notification 
+ * as part of the WS-N SimpleNotificationMessage they receive, and this class 
+ * provides some convenience methods for parsing out the actual property 
+ * values without having to filter through XML.
+ * <br><br>
+ * Users can implement messageReceived(SimpleNotificationMessage) by using the 
+ * getOldValue() and getNewValue() methods. The SimpleNotificationMessage class 
+ * can provide other data about where the message it came from.
+ * <br><br>
+ * Users who wish to accept messages where the change notifications are 
+ * packaged in a different event format can sub-class this class and 
+ * override the getChangeNotification() method so that it parses the event 
+ * XML to find the change notification. This allows you to change the 
+ * expected message format without having to rewrite the WS-RP notification 
+ * parsing code.
+ *
+ * @author Dan Jemiolo (danj)
+ *
+ */
+
+public abstract class AbstractMessageListener implements NotificationMessageListener
+{
+    //
+    // Used to lookup all exception messages
+    //
+    private static Messages _MESSAGES = 
+        MessagesFactory.get(AbstractMessageListener.class);
+    
+    //
+    // The SimpleTopic that this consumer is subscribed to.
+    //
+    private QName _topicPath = null;
+    
+    /**
+     * 
+     * @param topicPath
+     *        The qualified path for the WS-N SimpleTopic this consumer is 
+     *        subscribed to.
+     *
+     */
+    public AbstractMessageListener(QName topicPath)
+    {
+        if (topicPath == null)
+            throw new NullPointerException(_MESSAGES.get("NullTopicPath"));
+        
+        _topicPath = topicPath;
+    }
+    
+    /**
+     * 
+     * <b>Plug-In Point:</b> This method hunts through the SimpleNotificationMessage's 
+     * message content to find the WS-RP change notification element. Users 
+     * can override this method if they know that the notification element will 
+     * be wrapped in other XML (event formats such as CBE, WEF, etc.) rather 
+     * than directly under the <em>Message</em> element.
+     * <br><br>
+     * The default implementation retrieves the first Node under the 
+     * SimpleNotificationMessage's <em>Message</em> element and tries to interpret 
+     * that as the WS-RP change notification.
+     *
+     * @param message
+     * 
+     * @return The WS-RP change notification XML.
+     * 
+     * @throws SoapFault
+     *         <ul>
+     *         <li>If there is no Element under the SimpleNotificationMessage's 
+     *         <em>Message</em> section.</li>
+     *         <li>If the WS-RP change notification is not formatted properly.</li>
+     *         </ul>
+     *
+     */
+    protected Element getChangeNotification(NotificationMessage message)
+        throws SoapFault
+    {
+        Element changeXML = message.getMessageContent(WsrpConstants.NOTIFICATION_QNAME);
+        
+        if (changeXML == null)
+        {
+            Object[] filler = { WsrpConstants.NOTIFICATION_QNAME }; // FIXME: message filler
+            throw new InvalidMessageFormatFault(_MESSAGES.get("InvalidNotification", filler));
+        }
+        
+        return changeXML;
+    }
+    
+    /**
+     * 
+     * This is a convenience method that parses a SimpleNotificationMessage and 
+     * retrieves the new (current) property instance from the inner WS-RP 
+     * change notification.
+     *
+     * @param message
+     * 
+     * @return The property instance under 
+     *         <em>ResourcePropertyValueChangeNotification/NewValue</em>. or 
+     *         null if the property has been deleted.
+     * 
+     * @throws SoapFault
+     *         <ul>
+     *         <li>If the WS-RP change notification is not formatted properly.</li>
+     *         </ul>
+     *
+     */
+    protected Element getNewValue(NotificationMessage message)
+        throws SoapFault
+    {
+        Element wsrp = getChangeNotification(message);
+        return getValue(wsrp, WsrpConstants.NEW_VALUE_QNAME);
+    }
+    
+    /**
+     * 
+     * This is a convenience method that parses a SimpleNotificationMessage and 
+     * retrieves the old (previous) property instance from the inner WS-RP 
+     * change notification.
+     *
+     * @param message
+     * 
+     * @return The property instance under 
+     *         <em>ResourcePropertyValueChangeNotification/OldValue</em>, or 
+     *         null if no previous value existed.
+     * 
+     * @throws SoapFault
+     *         <ul>
+     *         <li>If the WS-RP change notification is not formatted properly.</li>
+     *         </ul>
+     *
+     */
+    protected Element getOldValue(NotificationMessage message)
+        throws SoapFault
+    {
+        Element wsrp = getChangeNotification(message);
+        return getValue(wsrp, WsrpConstants.OLD_VALUE_QNAME);
+    }
+    
+    public QName getTopicPath()
+    {
+        return _topicPath;
+    }
+    
+    private Element getValue(Element changeRoot, QName qname)
+        throws SoapFault
+    {
+        Element valueRoot = XmlUtils.getElement(changeRoot, qname);
+        
+        if (valueRoot == null)
+        {
+            Object[] filler = { null, qname };
+            throw new InvalidMessageFormatFault(_MESSAGES.get("InvalidNotification", filler));
+        }
+        
+        return XmlUtils.getFirstElement(valueRoot);
+    }
+}

Added: webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/properties/ChangeNotificationListener.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/properties/ChangeNotificationListener.java?rev=414696&view=auto
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/properties/ChangeNotificationListener.java (added)
+++ webservices/muse/trunk/modules/muse-wsn/src-impl/org/apache/muse/ws/notification/properties/ChangeNotificationListener.java Thu Jun 15 15:19:57 2006
@@ -0,0 +1,227 @@
+/*=============================================================================*
+ *  Copyright 2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+
+package org.apache.muse.ws.notification.properties;
+
+
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import org.apache.muse.util.xml.XmlUtils;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.notification.NotificationMessage;
+import org.apache.muse.ws.notification.NotificationProducer;
+import org.apache.muse.ws.notification.impl.SimpleNotificationMessage;
+import org.apache.muse.ws.resource.basefaults.BaseFault;
+import org.apache.muse.ws.resource.basefaults.WsbfUtils;
+import org.apache.muse.ws.resource.properties.WsrpConstants;
+import org.apache.muse.ws.resource.properties.listeners.PropertyChangeListener;
+import org.apache.muse.ws.resource.properties.listeners.ResourcePropertyListeners;
+
+/**
+ *
+ * ChangeNotificationListener is a 
+ * {@linkplain PropertyChangeListener PropertyChangeListener} that reports 
+ * changes in a resource property to the equivalent WS-N SimpleTopic that is used 
+ * to publish such changes to subscribers. This listener adds the property 
+ * change to the set of messages the SimpleTopic has to publish, and if there are 
+ * any subscribers at that moment, they will receive a WS-N notification about 
+ * the change.
+ * <br><br>
+ * The WS-N {@linkplain SimpleNotificationMessage NotificationMessages} create by 
+ * this listener have a <em>ResourcePropertyValueChangeNotification</em> 
+ * content item. That content contains the old and new values of the property. 
+ * The schema for this content item is described in the WS-RP spec (under 
+ * <em>Subscription</em>).
+ * <br><br>
+ * Note that this class does not send the notification messages directly - that 
+ * is handled by the WS-N framework.
+ * <br><br>
+ * This class should be leveraged through the 
+ * {@linkplain ChangeNotificationListenerFactory ChangeNotificationListenerFactory} that 
+ * is provided in the {@linkplain ResourcePropertyListeners ResourcePropertyListeners} 
+ * interface. This will allow users to write code that is independent of how 
+ * the WS-N property change notifications are formatted.
+ *
+ * @author Dan Jemiolo (danj)
+ * 
+ * @see WsrpNotificationListenerFactory
+ *
+ */
+
+public class ChangeNotificationListener implements PropertyChangeListener
+{
+    //
+    // The name of the property (topic) we're reporting on
+    //
+    private QName _topicName = null;
+    
+    private NotificationProducer _wsn = null;
+    
+    public ChangeNotificationListener(QName topicName, NotificationProducer wsn)
+    {
+        _topicName = topicName;
+        _wsn = wsn;
+    }
+    
+    /**
+     * 
+     * <b>Plug-In Point:</b> This method is used to create the message 
+     * content that is included in the WS-N SimpleNotificationMessage. Users 
+     * can override this method to provide a different event format for 
+     * the change notification. They may also wish to wrap the default 
+     * WS-RP notification (from createWsrpNotification(Object, Object)) 
+     * in their own event format.
+     * <br><br>
+     * This implementation simply returns the default WS-RP notification 
+     * element (<em>ResourcePropertyValueChangeNotification</em>).
+     *
+     * @param oldValue
+     * @param newValue
+     * 
+     * @return The XML that will be included in the SimpleNotificationMessage's 
+     *         <em>Message</em> section. The default value is the WS-RP 
+     *         change notification.
+     *
+     * @see #createWsrpNotification(Object, Object)
+     * 
+     */
+    protected Element createEventXML(Object oldValue, Object newValue)
+    {
+        return createWsrpNotification(oldValue, newValue);
+    }
+    
+    /**
+     * 
+     * This method creates the standard WS-RP notification XML using the 
+     * property values provided. Most users who need to modify the message 
+     * format should use createEventXML(Object, Object) and leverage this 
+     * method when appropriate; the output of this method conforms to the 
+     * WS-RP spec for change notifications and should be overridden 
+     * judiciously. 
+     *
+     * @param oldValue
+     * @param newValue
+     * 
+     * @return A WS-RP change notification. This includes a root element 
+     *         named <em>ResourcePropertyValueChangeNotification</em> with 
+     *         two children: <em>OldValue</em> and <em>NewValue</em>. The 
+     *         two children contain the appropriate property values, as 
+     *         provided by the user.
+     *
+     */
+    protected Element createWsrpNotification(Object oldValue, Object newValue)
+    {
+        QName qname = WsrpConstants.NOTIFICATION_QNAME;
+        Element change = XmlUtils.createElement(qname);
+        
+        Document doc = change.getOwnerDocument();
+        
+        //
+        // the old property value
+        //
+        qname = getPropertyName();
+        Element valueXML = null;
+        
+        if (oldValue != null)
+            valueXML = XmlUtils.createElement(doc, qname, oldValue);
+        
+        qname = WsrpConstants.OLD_VALUE_QNAME;
+        Element oldValueXML = XmlUtils.createElement(doc, qname, valueXML, false);
+        
+        change.appendChild(oldValueXML);
+        
+        //
+        // the new/current property value
+        //
+        qname = getPropertyName();
+        valueXML = null;
+        
+        if (newValue != null)
+            valueXML = XmlUtils.createElement(doc, qname, newValue);
+        
+        qname = WsrpConstants.NEW_VALUE_QNAME;
+        Element newValueXML = XmlUtils.createElement(doc, qname, valueXML, false);
+        
+        change.appendChild(newValueXML);
+        
+        return change;
+    }
+    
+    public QName getPropertyName()
+    {
+        return _topicName;
+    }
+    
+    protected QName getTopic()
+    {
+        return _topicName;
+    }
+    
+    protected NotificationProducer getNotificationProducer()
+    {
+        return _wsn;
+    }
+    
+    /**
+     * 
+     * Creates a WS-N message that holds a WS-RP change notification event. 
+     * The format of this event is the one recommended by the WS-RP spec for 
+     * integrating with WS-N; it has a root element named 
+     * <em>ResourcePropertyValueChangeNotification</em> that wraps two child 
+     * elements: the old value and the new value. Once the message is created, 
+     * it is set as the current message for the property's WS-N SimpleTopic.
+     * <br><br>
+     * This method does not actually send out any notifications. The WS-N 
+     * framework will send out the message in the proper format if there are 
+     * any subscribers to the property's topic at the time of the change.
+     * <br><br>
+     * NOTE: Users who wish to modify the format of the message that is 
+     * published to subscribers should sub-class this class and override the 
+     * createEventXML(Object, Object) method. This will allow you to change 
+     * the message format without having to rewrite message creation and 
+     * SimpleTopic manipulation code. 
+     *
+     * @see #createEventXML(Object, Object)
+     *
+     */
+    public void propertyChanged(Element oldValue, Element newValue)
+        throws BaseFault
+    {
+        Element change = createEventXML(oldValue, newValue);
+        
+        //
+        // put the event in a standard WS-N message
+        //
+        NotificationMessage message = new SimpleNotificationMessage(getTopic());
+        message.addMessageContent(change);
+        
+        //
+        // publish to the property's topic
+        //
+        try
+        {
+            getNotificationProducer().publish(message);
+        }
+        
+        catch (SoapFault error)
+        {
+            throw WsbfUtils.convertToFault(error);
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: muse-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: muse-commits-help@ws.apache.org