You are viewing a plain text version of this content. The canonical link for it is here.
Posted to muse-commits@ws.apache.org by da...@apache.org on 2007/09/02 19:25:04 UTC

svn commit: r572042 - /webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java

Author: danj
Date: Sun Sep  2 10:25:03 2007
New Revision: 572042

URL: http://svn.apache.org/viewvc?rev=572042&view=rev
Log:
Fix for MUSE-264 - patch from Chris Twiner. I modified it slightly to change where the exception handling was done, but the processFilter(s) logic is now in place.

Modified:
    webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java

Modified: webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java?rev=572042&r1=572041&r2=572042&view=diff
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java (original)
+++ webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java Sun Sep  2 10:25:03 2007
@@ -1,680 +1,750 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.muse.ws.notification.impl;
-
-import java.lang.reflect.Method;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.namespace.QName;
-
-import org.w3c.dom.Element;
-
-import org.apache.muse.core.Persistence;
-import org.apache.muse.core.Resource;
-import org.apache.muse.core.ResourceManager;
-import org.apache.muse.core.ResourceManagerListener;
-import org.apache.muse.core.routing.MessageHandler;
-import org.apache.muse.core.serializer.Serializer;
-import org.apache.muse.core.serializer.SerializerRegistry;
-import org.apache.muse.util.LoggingUtils;
-import org.apache.muse.util.ReflectUtils;
-import org.apache.muse.util.messages.Messages;
-import org.apache.muse.util.messages.MessagesFactory;
-import org.apache.muse.util.xml.XmlSerializable;
-import org.apache.muse.util.xml.XmlUtils;
-import org.apache.muse.ws.addressing.EndpointReference;
-import org.apache.muse.ws.addressing.soap.SoapFault;
-import org.apache.muse.ws.resource.basefaults.BaseFault;
-import org.apache.muse.ws.notification.*;
-import org.apache.muse.ws.notification.faults.*;
-import org.apache.muse.ws.notification.properties.*;
-import org.apache.muse.ws.notification.topics.*;
-import org.apache.muse.ws.notification.topics.impl.*;
-import org.apache.muse.ws.resource.WsResource;
-import org.apache.muse.ws.resource.impl.AbstractWsResourceCapability;
-import org.apache.muse.ws.resource.lifetime.ScheduledTermination;
-import org.apache.muse.ws.resource.lifetime.WsrlConstants;
-import org.apache.muse.ws.resource.metadata.MetadataDescriptor;
-import org.apache.muse.ws.resource.properties.ResourcePropertyCollection;
-import org.apache.muse.ws.resource.properties.WsrpConstants;
-import org.apache.muse.ws.resource.properties.listeners.PropertyChangeListener;
-
-/**
- *
- * SimpleNotificationProducer is Muse's default implementation of the 
- * WS-Notification NotificationProducer port type.
- * <br><br>
- * Resources that implement this port type should be deployed with the 
- * {@linkplain SubscriptionManager SubscriptionManager} type as well; the 
- * subscription resources cannot be managed without this endpoint.
- *
- * @author Dan Jemiolo (danj)
- *
- */
-
-public class SimpleNotificationProducer 
-    extends AbstractWsResourceCapability implements NotificationProducer, ResourceManagerListener
-{
-    //
-    // Used to lookup all exception messages
-    //
-    private static Messages _MESSAGES = MessagesFactory.get(SimpleNotificationProducer.class);
-    
-    //
-    // all topic names in the topic set - not a hierarchical collection
-    //
-    private Set _allTopicNames = new HashSet();
-    
-    //
-    // factory that will provide WSRP notifications with the right wrapper
-    //
-    private ChangeNotificationListenerFactory _listenerFactory = null;
-    
-    //
-    // context path of the subscription manager resource type
-    //
-    private String _subscriptionPath = null;
-    
-    //
-    // EPR -> current subscriptions
-    //
-    private Map _subscriptionsByEPR = new HashMap();
-    
-    //
-    // hierarchical topic set supported by the resource
-    //
-    private TopicSet _topicSet = new SimpleTopicSet();
-    
-    public synchronized void addSubscription(WsResource sub)
-    {
-        SubscriptionManager subMgr = 
-            (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
-        EndpointReference epr = sub.getEndpointReference();
-        _subscriptionsByEPR.put(epr, subMgr);
-    }
-    
-    public Topic addTopic(QName topicName)
-        throws BaseFault
-    {
-        String namespace = topicName.getNamespaceURI();
-        TopicNamespace topicSpace = getTopicNamespace(namespace);
-        
-        //
-        // make sure the property's namespace has a topic space
-        //
-        if (topicSpace == null)
-            topicSpace = addTopicNamespace(namespace);
-        
-        //
-        // if the property is new, we need to add a topic for it so 
-        // clients can subscribe to change events
-        //
-        String localName = topicName.getLocalPart();
-        Topic topic = new SimpleTopic(localName, topicSpace);
-        topicSpace.addTopic(topic);
-        
-        //
-        // add to our collection of names as well as the topic set tree
-        //
-        _allTopicNames.add(topicName);
-        
-        return topic;
-    }
-    
-    public TopicNamespace addTopicNamespace(String namespace)
-        throws BaseFault
-    {
-        TopicNamespace topics = new SimpleTopicNamespace(namespace);
-        topics.setName(WsrpConstants.TOPIC_SPACE_NAME);
-        
-        _topicSet.addTopicNamespace(topics);
-        
-        return topics;
-    }
-    
-    protected MessageHandler createGetCurrentMessageHandler()
-    {
-        MessageHandler handler = new GetCurrentMessageHandler();
-        
-        Method method = ReflectUtils.getFirstMethod(getClass(), "getCurrentMessage");
-        handler.setMethod(method);
-        
-        return handler;
-    }
-    
-    /**
-     * 
-     * Users can override this method to provide an alternative implementation 
-     * of ChangeNotificationListenerFactory that wraps WS-RP change notifications 
-     * in a different set of XML content.
-     * 
-     * @return An instance of WsrpNotificationListenerFactory
-     *
-     */
-    protected ChangeNotificationListenerFactory createNotificationListenerFactory()
-    {
-        return new WsrpNotificationListenerFactory();
-    }
-    
-    /**
-     * 
-     * Users can override this method to provide an alternative implementation 
-     * of NotificationMessage.
-     *
-     * @return An instance of SimpleNotificationMessage
-     *
-     */
-    protected NotificationMessage createNotificationMessage()
-    {
-        return new SimpleNotificationMessage();
-    }
-    
-    protected MessageHandler createSubscribeHandler()
-    {
-        MessageHandler handler = new SubscribeHandler();
-        
-        Method method = ReflectUtils.getFirstMethod(getClass(), "subscribe");
-        handler.setMethod(method);
-        
-        return handler;
-    }
-    
-    public NotificationMessage getCurrentMessage(QName topicPath)
-        throws NoCurrentMessageOnTopicFault, 
-               TopicNotSupportedFault
-    {
-        Topic topic = getTopic(topicPath);
-        
-        if (topic == null)
-        {
-            Object[] filler = { topicPath };
-            throw new TopicNotSupportedFault(_MESSAGES.get("TopicNotFound", filler));
-        }
-        
-        NotificationMessage message = topic.getCurrentMessage();
-        
-        if (message == null)
-        {
-            Object[] filler = { topicPath };
-            throw new NoCurrentMessageOnTopicFault(_MESSAGES.get("NoMessageAvailable", filler));
-        }
-        
-        return message;
-    }
-    
-    public boolean getFixedTopicSet()
-    {
-        return true;
-    }
-    
-    protected ChangeNotificationListenerFactory getNotificationListenerFactory()
-    {
-        return _listenerFactory;
-    }
-    
-    public QName[] getPropertyNames()
-    {
-        return PROPERTIES;
-    }
-    
-    protected String getSubscriptionContextPath()
-    {
-        return _subscriptionPath;
-    }
-    
-    protected synchronized Collection getSubscriptions()
-    {
-        return Collections.unmodifiableCollection(_subscriptionsByEPR.values());
-    }
-    
-    public Topic getTopic(QName topicName)
-    {
-        TopicNamespace topicSpace = getTopicNamespace(topicName.getNamespaceURI());
-        
-        if (topicSpace == null)
-            return null;
-        
-        return topicSpace.getTopic(topicName.getLocalPart());
-    }
-    
-    public QName[] getTopicExpression()
-    {
-        QName[] array = new QName[_allTopicNames.size()];
-        return (QName[])_allTopicNames.toArray(array);
-    }
-    
-    public String[] getTopicExpressionDialect()
-    {
-        return new String[]{ WstConstants.CONCRETE_TOPIC_URI };
-    }
-    
-    public TopicNamespace getTopicNamespace(String namespace)
-    {
-        return getTopicSet().getTopicNamespace(namespace);
-    }
-
-    public TopicSet getTopicSet()
-    {
-        return _topicSet;
-    }
-    
-    protected boolean hasSubscription(EndpointReference subscriptionEPR)
-    {
-        return _subscriptionsByEPR.containsKey(subscriptionEPR);
-    }
-
-    public boolean hasTopic(QName topicName)
-    {
-        return _allTopicNames.contains(topicName);
-    }
-    
-    public void initialize()
-        throws SoapFault
-    {
-        super.initialize();
-        
-        _listenerFactory = createNotificationListenerFactory();
-        
-        setMessageHandler(createSubscribeHandler());
-        setMessageHandler(createGetCurrentMessageHandler());
-        
-        //
-        // make sure we're exposing the subscription endpoint so that 
-        // clients can manage subscriptions/consumers
-        //
-        WsResource resource = getWsResource();
-        ResourceManager manager = resource.getResourceManager();
-        _subscriptionPath = manager.getResourceContextPath(SubscriptionManager.class);
-
-        if (_subscriptionPath == null)
-            throw new RuntimeException(_MESSAGES.get("NoSubscriptionManager"));
-        
-        //
-        // make sure any persistence impl is of the right type
-        //
-        Persistence persistence = getPersistence();
-        
-        if (persistence != null)
-        {
-            if (!NotificationProducerPersistence.class.isAssignableFrom(persistence.getClass()))
-            {
-                Object[] filler = { NotificationProducerPersistence.class, persistence.getClass() };
-                throw new RuntimeException(_MESSAGES.get("IncorrectPersistenceRoot", filler));
-            }
-            
-            NotificationProducerPersistence npPersistence = (NotificationProducerPersistence)persistence;
-            npPersistence.setNotificationProducer(this);
-        }
-        
-        //
-        // make sure we can listen for new subscriptions/destructions
-        //
-        manager.addListener(this);
-    }
-    
-    public void initializeCompleted()
-        throws SoapFault
-    {
-        super.initializeCompleted();
-        
-        ChangeNotificationListenerFactory factory = getNotificationListenerFactory();
-        
-        //
-        // for every property in the resource's ws-rp doc, create a property 
-        // change listener that will send out notifications for each change
-        //
-        WsResource resource = getWsResource();
-        ResourcePropertyCollection props = resource.getPropertyCollection();
-        Iterator i = props.getPropertyNames().iterator();
-        
-        while (i.hasNext())
-        {
-            QName name = (QName)i.next();
-            
-            addTopic(name);
-            
-            PropertyChangeListener listener = factory.newInstance(name, resource);
-            props.addChangeListener(listener);
-        }        
-
-        //
-        // if the resource supports either WS-RL capability, add support 
-        // for the WS-RL termination topic
-        //
-        if (resource.hasCapability(WsrlConstants.IMMEDIATE_TERMINATION_URI) || 
-            resource.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
-            addTopic(WsrlConstants.TERMINATION_TOPIC_QNAME);
-                
-        //
-        // read all of the wsnt:TopicExpression values from the RMD doc 
-        // and make sure we create Topic objects for them
-        //
-        MetadataDescriptor rmd = props.getMetadata();
-        
-        if (rmd.hasProperty(WsnConstants.TOPIC_EXPRESSION_QNAME))
-        {
-            Collection[] values = new Collection[2];
-            values[0] = rmd.getInitialValues(WsnConstants.TOPIC_EXPRESSION_QNAME);
-            values[1] = rmd.getStaticValues(WsnConstants.TOPIC_EXPRESSION_QNAME);
-            
-            for (int n = 0; n < values.length; ++n)
-            {
-                i = values[n].iterator();
-                
-                while (i.hasNext())
-                {
-                    Element topicExpression = (Element)i.next();
-                    QName topicName = XmlUtils.getQName(topicExpression);
-                    addTopic(topicName);
-                }
-            }
-        }
-    }
-    
-    public void prepareShutdown() 
-        throws SoapFault
-    {
-        //
-        // if WSRL is present, send a termination notification to subscribers 
-        // before all subscriptions are destroyed
-        //
-        if (hasTopic(WsrlConstants.TERMINATION_TOPIC_QNAME))
-        {
-            Element payload = XmlUtils.createElement(WsrlConstants.NOTIFICATION_QNAME);
-            XmlUtils.setElement(payload, WsrlConstants.TERMINATION_TIME_QNAME, new Date());
-            
-            publish(WsrlConstants.TERMINATION_TOPIC_QNAME, payload);
-        }
-        
-        super.prepareShutdown();
-    }
-    
-    public void publish(QName topicName, Element content)
-        throws SoapFault
-    {
-        publish(topicName, new Element[]{ content });
-    }
-    
-    public void publish(QName topicName, Element[] content)
-        throws SoapFault
-    {
-        //
-        // construct the message/payload
-        //
-        NotificationMessage message = createNotificationMessage();
-        
-        for (int n = 0; n < content.length; ++n)
-            message.addMessageContent(content[n]);
-        
-        message.setTopic(topicName);
-        
-        //
-        // give the message to each subscription so it can decide if it 
-        // should be sent or not
-        //
-        Iterator i = getSubscriptions().iterator();
-        
-        while (i.hasNext())
-        {
-            SubscriptionManager sub = (SubscriptionManager)i.next();
-            sub.publish(message);
-        }
-        
-        //
-        // if a topic was used, record the message as the 'current' message 
-        // for the topic (will be returned by getCurrentMessage())
-        //
-        if (topicName != null)
-        {
-            Topic topic = getTopic(topicName);
-            topic.setCurrentMessage(message);
-        }
-    }
-    
-    public void publish(QName topicName, XmlSerializable content)
-        throws SoapFault
-    {
-        publish(topicName, new XmlSerializable[]{ content });
-    }
-    
-    public void publish(QName topicName, XmlSerializable[] content)
-        throws SoapFault
-    {
-        Element[] contentXML = new Element[content.length];
-        
-        for (int n = 0; n < content.length; ++n)
-            contentXML[n] = content[n].toXML();
-        
-        publish(topicName, contentXML);
-    }
-    
-    public void publish(QName topicName, QName contentName, Object content)
-        throws SoapFault
-    {
-        publish(topicName, new QName[]{ contentName }, new Object[]{ content });
-    }
-    
-    public void publish(QName topicName, QName[] contentNames, Object[] content)
-        throws SoapFault
-    {
-        SerializerRegistry registry = SerializerRegistry.getInstance();
-        
-        Element[] contentXML = new Element[content.length];
-        
-        //
-        // get the right serializer and transform POJO to XML
-        //
-        for (int n = 0; n < content.length; ++n)
-        {
-            Class contentType = content[n].getClass();
-            Serializer ser = registry.getSerializer(contentType);
-            contentXML[n] = ser.toXML(content[n], contentNames[n]);
-        }
-        
-        publish(topicName, contentXML);
-    }
-    
-    public synchronized void removeSubscription(EndpointReference epr)
-    {
-        _subscriptionsByEPR.remove(epr);
-    }
-
-    public void resourceAdded(EndpointReference epr, Resource resource)
-    {
-        //
-        // no action taken - we only need to listen for resourceRemoved() 
-        // so that we can remove subscriptions that have been destroyed
-        //
-    }
-
-    public void resourceRemoved(EndpointReference epr)
-    {
-        //
-        // this call is synchronized, so if we're publishing, the 
-        // collection will not be updated until afterwards
-        //
-        boolean subExists = hasSubscription(epr);
-        
-        if (!subExists)
-            return;
-        
-        removeSubscription(epr);
-        
-        NotificationProducerPersistence persistence = (NotificationProducerPersistence)getPersistence();
-        
-        try
-        {
-            if (persistence != null)
-                persistence.resourceRemoved(epr);
-        }
-        
-        catch (SoapFault fault)
-        {
-            LoggingUtils.logError(getLog(), fault);
-        }
-    }
-
-    public WsResource subscribe(EndpointReference consumer, 
-                                Filter filter, 
-                                Date terminationTime, 
-                                Policy policy) 
-        throws TopicNotSupportedFault, 
-               UnacceptableInitialTerminationTimeFault,
-               SubscribeCreationFailedFault
-    {
-        if (consumer == null)
-            throw new NullPointerException(_MESSAGES.get("NullConsumerEPR"));
-        
-        //
-        // sanity check - the message handler should do this, but we handle 
-        // null filters just in case
-        //
-        if (filter == null)
-            filter = PublishAllMessagesFilter.getInstance();
-        
-        //
-        // HACK: ugh. topic filters are different from message 
-        //       pattern and properties filters because they 
-        //       rely on internal data structures (topics) that 
-        //       are more concretely defined than the other two 
-        //       filter types. it's a one-off. we have to check 
-        //       against the topic set of the resource, and there 
-        //       is no such equivalent task for other filter types. 
-        //       so we downcast.
-        //
-        if (filter instanceof TopicFilter)
-        {
-            TopicFilter topicFilter = (TopicFilter)filter;
-            QName topicName = topicFilter.getTopic();
-            
-            if (!hasTopic(topicName))
-            {
-                Object[] filler = { topicName };
-                throw new TopicNotSupportedFault(_MESSAGES.get("TopicNotFound", filler));
-            }
-            
-            Topic topic = getTopic(topicName);
-            topicFilter.setTopic(topic);            
-        }
-        
-        //
-        // HACK #2: we can't set the resource for producer properties filters 
-        //          during filter creation without making the filter creation 
-        //          API unusable in many desired use cases. so instead, we 
-        //          hack around it by adding the resource reference late so 
-        //          that the filter can still do the WSRP Get calls it needs 
-        //          to do in order to evaluate the message
-        //
-        else if (filter instanceof ProducerPropertiesFilter)
-        {
-            ProducerPropertiesFilter propertiesFilter = (ProducerPropertiesFilter)filter;
-            propertiesFilter.setResource(getWsResource());
-        }
-        
-        WsResource producer = getWsResource();
-        ResourceManager manager = producer.getResourceManager();
-        
-        //
-        // create the resource to represent the subscription
-        //
-        String endpoint = getSubscriptionContextPath();
-        WsResource sub = null;
-        
-        try
-        {
-            sub = (WsResource)manager.createResource(endpoint);
-        }
-        
-        catch (SoapFault error)
-        {
-            throw new SubscribeCreationFailedFault(error);
-        }
-        
-        //
-        // use the subscription capability to set all the subscription fields
-        //
-        SubscriptionManager subMgr = 
-            (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
-        
-        EndpointReference producerEPR = producer.getEndpointReference();
-        subMgr.setProducerReference(producerEPR);
-        
-        subMgr.setConsumerReference(consumer);
-        subMgr.setFilter(filter);
-        subMgr.setSubscriptionPolicy(policy);
-        
-        try
-        {
-            sub.initialize();
-            manager.addResource(sub.getEndpointReference(), sub);
-        }
-        
-        catch (SoapFault error)
-        {
-            throw new SubscribeCreationFailedFault(error);            
-        }
-        
-        //
-        // set termination time AFTER we initialize so that we can rely 
-        // on the base Resource class' implementation of WS-RL (which 
-        // tries to apply the change immediately, rather than storing it 
-        // in a field and setting it during initialization)
-        //
-        if (sub.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
-        {
-            ScheduledTermination wsrl = (ScheduledTermination)sub.getCapability(WsrlConstants.SCHEDULED_TERMINATION_URI);
-            
-            try
-            {
-                wsrl.setTerminationTime(terminationTime);
-            }
-            
-            catch (BaseFault error)
-            {
-                throw new UnacceptableInitialTerminationTimeFault(error);
-            }
-        }
-        
-        addSubscription(sub);
-        
-        NotificationProducerPersistence persistence = (NotificationProducerPersistence)getPersistence();
-        
-        try
-        {
-            if (persistence != null)
-                persistence.resourceAdded(sub.getEndpointReference(), sub);
-        }
-        
-        catch (SoapFault fault)
-        {
-            LoggingUtils.logError(getLog(), fault);
-        }
-        
-        return sub;
-    }
-}
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.muse.ws.notification.impl;
+
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Element;
+
+import org.apache.muse.core.Persistence;
+import org.apache.muse.core.Resource;
+import org.apache.muse.core.ResourceManager;
+import org.apache.muse.core.ResourceManagerListener;
+import org.apache.muse.core.routing.MessageHandler;
+import org.apache.muse.core.serializer.Serializer;
+import org.apache.muse.core.serializer.SerializerRegistry;
+import org.apache.muse.util.LoggingUtils;
+import org.apache.muse.util.ReflectUtils;
+import org.apache.muse.util.messages.Messages;
+import org.apache.muse.util.messages.MessagesFactory;
+import org.apache.muse.util.xml.XmlSerializable;
+import org.apache.muse.util.xml.XmlUtils;
+import org.apache.muse.ws.addressing.EndpointReference;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.resource.basefaults.BaseFault;
+import org.apache.muse.ws.notification.*;
+import org.apache.muse.ws.notification.faults.*;
+import org.apache.muse.ws.notification.properties.*;
+import org.apache.muse.ws.notification.topics.*;
+import org.apache.muse.ws.notification.topics.impl.*;
+import org.apache.muse.ws.resource.WsResource;
+import org.apache.muse.ws.resource.impl.AbstractWsResourceCapability;
+import org.apache.muse.ws.resource.lifetime.ScheduledTermination;
+import org.apache.muse.ws.resource.lifetime.WsrlConstants;
+import org.apache.muse.ws.resource.metadata.MetadataDescriptor;
+import org.apache.muse.ws.resource.properties.ResourcePropertyCollection;
+import org.apache.muse.ws.resource.properties.WsrpConstants;
+import org.apache.muse.ws.resource.properties.listeners.PropertyChangeListener;
+
+/**
+ *
+ * SimpleNotificationProducer is Muse's default implementation of the 
+ * WS-Notification NotificationProducer port type.
+ * <br><br>
+ * Resources that implement this port type should be deployed with the 
+ * {@linkplain SubscriptionManager SubscriptionManager} type as well; the 
+ * subscription resources cannot be managed without this endpoint.
+ *
+ * @author Dan Jemiolo (danj)
+ *
+ */
+
+public class SimpleNotificationProducer 
+    extends AbstractWsResourceCapability implements NotificationProducer, ResourceManagerListener
+{
+    //
+    // Used to lookup all exception messages
+    //
+    private static Messages _MESSAGES = MessagesFactory.get(SimpleNotificationProducer.class);
+    
+    //
+    // all topic names in the topic set - not a hierarchical collection
+    //
+    private Set _allTopicNames = new HashSet();
+    
+    //
+    // factory that will provide WSRP notifications with the right wrapper
+    //
+    private ChangeNotificationListenerFactory _listenerFactory = null;
+    
+    //
+    // context path of the subscription manager resource type
+    //
+    private String _subscriptionPath = null;
+    
+    //
+    // EPR -> current subscriptions
+    //
+    private Map _subscriptionsByEPR = new HashMap();
+    
+    //
+    // hierarchical topic set supported by the resource
+    //
+    private TopicSet _topicSet = new SimpleTopicSet();
+    
+    public synchronized void addSubscription(WsResource sub)
+        throws SoapFault
+    {
+        SubscriptionManager subMgr = 
+            (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
+        EndpointReference epr = sub.getEndpointReference();
+        
+        //
+        // allow persistent subscriptions to be processed as if added live 
+        //
+        processFilters(getWsResource(), subMgr.getFilter());
+        
+        _subscriptionsByEPR.put(epr, subMgr);
+    }
+    
+    public Topic addTopic(QName topicName)
+        throws BaseFault
+    {
+        String namespace = topicName.getNamespaceURI();
+        TopicNamespace topicSpace = getTopicNamespace(namespace);
+        
+        //
+        // make sure the property's namespace has a topic space
+        //
+        if (topicSpace == null)
+            topicSpace = addTopicNamespace(namespace);
+        
+        //
+        // if the property is new, we need to add a topic for it so 
+        // clients can subscribe to change events
+        //
+        String localName = topicName.getLocalPart();
+        Topic topic = new SimpleTopic(localName, topicSpace);
+        topicSpace.addTopic(topic);
+        
+        //
+        // add to our collection of names as well as the topic set tree
+        //
+        _allTopicNames.add(topicName);
+        
+        return topic;
+    }
+    
+    public TopicNamespace addTopicNamespace(String namespace)
+        throws BaseFault
+    {
+        TopicNamespace topics = new SimpleTopicNamespace(namespace);
+        topics.setName(WsrpConstants.TOPIC_SPACE_NAME);
+        
+        _topicSet.addTopicNamespace(topics);
+        
+        return topics;
+    }
+    
+    protected MessageHandler createGetCurrentMessageHandler()
+    {
+        MessageHandler handler = new GetCurrentMessageHandler();
+        
+        Method method = ReflectUtils.getFirstMethod(getClass(), "getCurrentMessage");
+        handler.setMethod(method);
+        
+        return handler;
+    }
+    
+    /**
+     * 
+     * Users can override this method to provide an alternative implementation 
+     * of ChangeNotificationListenerFactory that wraps WS-RP change notifications 
+     * in a different set of XML content.
+     * 
+     * @return An instance of WsrpNotificationListenerFactory
+     *
+     */
+    protected ChangeNotificationListenerFactory createNotificationListenerFactory()
+    {
+        return new WsrpNotificationListenerFactory();
+    }
+    
+    /**
+     * 
+     * Users can override this method to provide an alternative implementation 
+     * of NotificationMessage.
+     *
+     * @return An instance of SimpleNotificationMessage
+     *
+     */
+    protected NotificationMessage createNotificationMessage()
+    {
+        return new SimpleNotificationMessage();
+    }
+    
+    protected MessageHandler createSubscribeHandler()
+    {
+        MessageHandler handler = new SubscribeHandler();
+        
+        Method method = ReflectUtils.getFirstMethod(getClass(), "subscribe");
+        handler.setMethod(method);
+        
+        return handler;
+    }
+    
+    public NotificationMessage getCurrentMessage(QName topicPath)
+        throws NoCurrentMessageOnTopicFault, 
+               TopicNotSupportedFault
+    {
+        Topic topic = getTopic(topicPath);
+        
+        if (topic == null)
+        {
+            Object[] filler = { topicPath };
+            throw new TopicNotSupportedFault(_MESSAGES.get("TopicNotFound", filler));
+        }
+        
+        NotificationMessage message = topic.getCurrentMessage();
+        
+        if (message == null)
+        {
+            Object[] filler = { topicPath };
+            throw new NoCurrentMessageOnTopicFault(_MESSAGES.get("NoMessageAvailable", filler));
+        }
+        
+        return message;
+    }
+    
+    public boolean getFixedTopicSet()
+    {
+        return true;
+    }
+    
+    protected ChangeNotificationListenerFactory getNotificationListenerFactory()
+    {
+        return _listenerFactory;
+    }
+    
+    public QName[] getPropertyNames()
+    {
+        return PROPERTIES;
+    }
+    
+    protected String getSubscriptionContextPath()
+    {
+        return _subscriptionPath;
+    }
+    
+    protected synchronized Collection getSubscriptions()
+    {
+        return Collections.unmodifiableCollection(_subscriptionsByEPR.values());
+    }
+    
+    public Topic getTopic(QName topicName)
+    {
+        TopicNamespace topicSpace = getTopicNamespace(topicName.getNamespaceURI());
+        
+        if (topicSpace == null)
+            return null;
+        
+        return topicSpace.getTopic(topicName.getLocalPart());
+    }
+    
+    public QName[] getTopicExpression()
+    {
+        QName[] array = new QName[_allTopicNames.size()];
+        return (QName[])_allTopicNames.toArray(array);
+    }
+    
+    public String[] getTopicExpressionDialect()
+    {
+        return new String[]{ WstConstants.CONCRETE_TOPIC_URI };
+    }
+    
+    public TopicNamespace getTopicNamespace(String namespace)
+    {
+        return getTopicSet().getTopicNamespace(namespace);
+    }
+
+    public TopicSet getTopicSet()
+    {
+        return _topicSet;
+    }
+    
+    protected boolean hasSubscription(EndpointReference subscriptionEPR)
+    {
+        return _subscriptionsByEPR.containsKey(subscriptionEPR);
+    }
+
+    public boolean hasTopic(QName topicName)
+    {
+        return _allTopicNames.contains(topicName);
+    }
+    
+    public void initialize()
+        throws SoapFault
+    {
+        super.initialize();
+        
+        _listenerFactory = createNotificationListenerFactory();
+        
+        setMessageHandler(createSubscribeHandler());
+        setMessageHandler(createGetCurrentMessageHandler());
+        
+        //
+        // make sure we're exposing the subscription endpoint so that 
+        // clients can manage subscriptions/consumers
+        //
+        WsResource resource = getWsResource();
+        ResourceManager manager = resource.getResourceManager();
+        _subscriptionPath = manager.getResourceContextPath(SubscriptionManager.class);
+
+        if (_subscriptionPath == null)
+            throw new RuntimeException(_MESSAGES.get("NoSubscriptionManager"));
+        
+        //
+        // make sure any persistence impl is of the right type
+        //
+        Persistence persistence = getPersistence();
+        
+        if (persistence != null)
+        {
+            if (!NotificationProducerPersistence.class.isAssignableFrom(persistence.getClass()))
+            {
+                Object[] filler = { NotificationProducerPersistence.class, persistence.getClass() };
+                throw new RuntimeException(_MESSAGES.get("IncorrectPersistenceRoot", filler));
+            }
+            
+            NotificationProducerPersistence npPersistence = (NotificationProducerPersistence)persistence;
+            npPersistence.setNotificationProducer(this);
+        }
+        
+        //
+        // make sure we can listen for new subscriptions/destructions
+        //
+        manager.addListener(this);
+    }
+    
+    public void initializeCompleted()
+        throws SoapFault
+    {
+        super.initializeCompleted();
+        
+        ChangeNotificationListenerFactory factory = getNotificationListenerFactory();
+        
+        //
+        // for every property in the resource's ws-rp doc, create a property 
+        // change listener that will send out notifications for each change
+        //
+        WsResource resource = getWsResource();
+        ResourcePropertyCollection props = resource.getPropertyCollection();
+        Iterator i = props.getPropertyNames().iterator();
+        
+        while (i.hasNext())
+        {
+            QName name = (QName)i.next();
+            
+            addTopic(name);
+            
+            PropertyChangeListener listener = factory.newInstance(name, resource);
+            props.addChangeListener(listener);
+        }        
+
+        //
+        // if the resource supports either WS-RL capability, add support 
+        // for the WS-RL termination topic
+        //
+        if (resource.hasCapability(WsrlConstants.IMMEDIATE_TERMINATION_URI) || 
+            resource.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
+            addTopic(WsrlConstants.TERMINATION_TOPIC_QNAME);
+                
+        //
+        // read all of the wsnt:TopicExpression values from the RMD doc 
+        // and make sure we create Topic objects for them
+        //
+        MetadataDescriptor rmd = props.getMetadata();
+        
+        if (rmd.hasProperty(WsnConstants.TOPIC_EXPRESSION_QNAME))
+        {
+            Collection[] values = new Collection[2];
+            values[0] = rmd.getInitialValues(WsnConstants.TOPIC_EXPRESSION_QNAME);
+            values[1] = rmd.getStaticValues(WsnConstants.TOPIC_EXPRESSION_QNAME);
+            
+            for (int n = 0; n < values.length; ++n)
+            {
+                i = values[n].iterator();
+                
+                while (i.hasNext())
+                {
+                    Element topicExpression = (Element)i.next();
+                    QName topicName = XmlUtils.getQName(topicExpression);
+                    addTopic(topicName);
+                }
+            }
+        }
+    }
+    
+    public void prepareShutdown() 
+        throws SoapFault
+    {
+        //
+        // if WSRL is present, send a termination notification to subscribers 
+        // before all subscriptions are destroyed
+        //
+        if (hasTopic(WsrlConstants.TERMINATION_TOPIC_QNAME))
+        {
+            Element payload = XmlUtils.createElement(WsrlConstants.NOTIFICATION_QNAME);
+            XmlUtils.setElement(payload, WsrlConstants.TERMINATION_TIME_QNAME, new Date());
+            
+            publish(WsrlConstants.TERMINATION_TOPIC_QNAME, payload);
+        }
+        
+        super.prepareShutdown();
+    }
+    
+    /**
+     * 
+     * Developers can override this to provide further custom processing of filters 
+     * being sure to call super.processFilters() at the end of their code.
+     * 
+     * @param res
+     * @param filter
+     * 
+     * @throws SoapFault 
+     * 
+     */
+    protected void processFilter(WsResource res, Filter filter) 
+        throws SoapFault 
+    {
+        //
+        // HACK: ugh. topic filters are different from message 
+        //       pattern and properties filters because they 
+        //       rely on internal data structures (topics) that 
+        //       are more concretely defined than the other two 
+        //       filter types. it's a one-off. we have to check 
+        //       against the topic set of the resource, and there 
+        //       is no such equivalent task for other filter types. 
+        //       so we downcast.
+        //
+        if (filter instanceof TopicFilter)
+        {
+            TopicFilter topicFilter = (TopicFilter)filter;
+            QName topicName = topicFilter.getTopic();
+            
+            if (!hasTopic(topicName))
+            {
+                Object[] filler = { topicName };
+                throw new TopicNotSupportedFault(_MESSAGES.get("TopicNotFound", filler));
+            }
+            
+            Topic topic = getTopic(topicName);
+            topicFilter.setTopic(topic);            
+        }
+        
+        //
+        // HACK #2: we can't set the resource for producer properties filters 
+        //          during filter creation without making the filter creation 
+        //          API unusable in many desired use cases. so instead, we 
+        //          hack around it by adding the resource reference late so 
+        //          that the filter can still do the WSRP Get calls it needs 
+        //          to do in order to evaluate the message
+        //
+        else if (filter instanceof ProducerPropertiesFilter)
+        {
+            ProducerPropertiesFilter propertiesFilter = (ProducerPropertiesFilter)filter;
+            propertiesFilter.setResource(res);
+        }
+    }
+    
+    protected void processFilters(WsResource res, Filter filter) 
+        throws SoapFault 
+    {
+        if (filter instanceof FilterCollection) 
+        {
+            FilterCollection collection = (FilterCollection)filter;
+            Iterator itr = collection.getFilters().iterator();
+            
+            while (itr.hasNext()) 
+            {
+                Filter subFilter = (Filter)itr.next();
+                processFilter(res, subFilter);
+            }
+        }
+        
+        else
+            processFilter(res, filter);
+    }
+    
+    public void publish(QName topicName, Element content)
+        throws SoapFault
+    {
+        publish(topicName, new Element[]{ content });
+    }
+    
+    public void publish(QName topicName, Element[] content)
+        throws SoapFault
+    {
+        //
+        // construct the message/payload
+        //
+        NotificationMessage message = createNotificationMessage();
+        
+        for (int n = 0; n < content.length; ++n)
+            message.addMessageContent(content[n]);
+        
+        message.setTopic(topicName);
+        
+        //
+        // give the message to each subscription so it can decide if it 
+        // should be sent or not
+        //
+        Iterator i = getSubscriptions().iterator();
+        
+        while (i.hasNext())
+        {
+            SubscriptionManager sub = (SubscriptionManager)i.next();
+            sub.publish(message);
+        }
+        
+        //
+        // if a topic was used, record the message as the 'current' message 
+        // for the topic (will be returned by getCurrentMessage())
+        //
+        if (topicName != null)
+        {
+            Topic topic = getTopic(topicName);
+            topic.setCurrentMessage(message);
+        }
+    }
+    
+    public void publish(QName topicName, XmlSerializable content)
+        throws SoapFault
+    {
+        publish(topicName, new XmlSerializable[]{ content });
+    }
+    
+    public void publish(QName topicName, XmlSerializable[] content)
+        throws SoapFault
+    {
+        Element[] contentXML = new Element[content.length];
+        
+        for (int n = 0; n < content.length; ++n)
+            contentXML[n] = content[n].toXML();
+        
+        publish(topicName, contentXML);
+    }
+    
+    public void publish(QName topicName, QName contentName, Object content)
+        throws SoapFault
+    {
+        publish(topicName, new QName[]{ contentName }, new Object[]{ content });
+    }
+    
+    public void publish(QName topicName, QName[] contentNames, Object[] content)
+        throws SoapFault
+    {
+        SerializerRegistry registry = SerializerRegistry.getInstance();
+        
+        Element[] contentXML = new Element[content.length];
+        
+        //
+        // get the right serializer and transform POJO to XML
+        //
+        for (int n = 0; n < content.length; ++n)
+        {
+            Class contentType = content[n].getClass();
+            Serializer ser = registry.getSerializer(contentType);
+            contentXML[n] = ser.toXML(content[n], contentNames[n]);
+        }
+        
+        publish(topicName, contentXML);
+    }
+    
+    public synchronized void removeSubscription(EndpointReference epr)
+    {
+        _subscriptionsByEPR.remove(epr);
+    }
+
+    public void resourceAdded(EndpointReference epr, Resource resource)
+    {
+        //
+        // no action taken - we only need to listen for resourceRemoved() 
+        // so that we can remove subscriptions that have been destroyed
+        //
+    }
+
+    public void resourceRemoved(EndpointReference epr)
+    {
+        //
+        // this call is synchronized, so if we're publishing, the 
+        // collection will not be updated until afterwards
+        //
+        boolean subExists = hasSubscription(epr);
+        
+        if (!subExists)
+            return;
+        
+        removeSubscription(epr);
+        
+        NotificationProducerPersistence persistence = (NotificationProducerPersistence)getPersistence();
+        
+        try
+        {
+            if (persistence != null)
+                persistence.resourceRemoved(epr);
+        }
+        
+        catch (SoapFault fault)
+        {
+            LoggingUtils.logError(getLog(), fault);
+        }
+    }
+
+    public WsResource subscribe(EndpointReference consumer, 
+                                Filter filter, 
+                                Date terminationTime, 
+                                Policy policy) 
+        throws TopicNotSupportedFault, 
+               UnacceptableInitialTerminationTimeFault,
+               SubscribeCreationFailedFault
+    {
+        if (consumer == null)
+            throw new NullPointerException(_MESSAGES.get("NullConsumerEPR"));
+        
+        //
+        // sanity check - the message handler should do this, but we handle 
+        // null filters just in case
+        //
+        if (filter == null)
+            filter = PublishAllMessagesFilter.getInstance();
+        
+        WsResource producer = getWsResource();
+        
+        try 
+        {
+            processFilters(producer, filter);
+        } 
+        
+        catch (SoapFault error) 
+        {
+            throw new SubscribeCreationFailedFault(error);
+        }
+        
+        ResourceManager manager = producer.getResourceManager();
+        
+        //
+        // create the resource to represent the subscription
+        //
+        String endpoint = getSubscriptionContextPath();
+        WsResource sub = null;
+        
+        try
+        {
+            sub = (WsResource)manager.createResource(endpoint);
+        }
+        
+        catch (SoapFault error)
+        {
+            throw new SubscribeCreationFailedFault(error);
+        }
+        
+        //
+        // use the subscription capability to set all the subscription fields
+        //
+        SubscriptionManager subMgr = 
+            (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
+        
+        EndpointReference producerEPR = producer.getEndpointReference();
+        subMgr.setProducerReference(producerEPR);
+        
+        subMgr.setConsumerReference(consumer);
+        subMgr.setFilter(filter);
+        subMgr.setSubscriptionPolicy(policy);
+        
+        try
+        {
+            sub.initialize();
+            manager.addResource(sub.getEndpointReference(), sub);
+        }
+        
+        catch (SoapFault error)
+        {
+            throw new SubscribeCreationFailedFault(error);            
+        }
+        
+        //
+        // set termination time AFTER we initialize so that we can rely 
+        // on the base Resource class' implementation of WS-RL (which 
+        // tries to apply the change immediately, rather than storing it 
+        // in a field and setting it during initialization)
+        //
+        if (sub.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
+        {
+            ScheduledTermination wsrl = (ScheduledTermination)sub.getCapability(WsrlConstants.SCHEDULED_TERMINATION_URI);
+            
+            try
+            {
+                wsrl.setTerminationTime(terminationTime);
+            }
+            
+            catch (BaseFault error)
+            {
+                throw new UnacceptableInitialTerminationTimeFault(error);
+            }
+        }
+        
+        try 
+        {
+            addSubscription(sub);
+        } 
+        
+        catch (SoapFault error) 
+        {
+            try
+            {
+                manager.removeResource(sub.getEndpointReference());
+            }
+            
+            catch (SoapFault error2)
+            {
+                error2.printStackTrace();
+            }
+            
+            throw new SubscribeCreationFailedFault(error);
+        }
+        
+        NotificationProducerPersistence persistence = (NotificationProducerPersistence)getPersistence();
+        
+        try
+        {
+            if (persistence != null)
+                persistence.resourceAdded(sub.getEndpointReference(), sub);
+        }
+        
+        catch (SoapFault fault)
+        {
+            LoggingUtils.logError(getLog(), fault);
+        }
+        
+        return sub;
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: muse-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: muse-commits-help@ws.apache.org