You are viewing a plain text version of this content. The canonical link for it is here.
Posted to muse-commits@ws.apache.org by da...@apache.org on 2007/09/02 19:25:04 UTC
svn commit: r572042 -
/webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java
Author: danj
Date: Sun Sep 2 10:25:03 2007
New Revision: 572042
URL: http://svn.apache.org/viewvc?rev=572042&view=rev
Log:
Fix for MUSE-264 - patch from Chris Twiner. I modified it slightly to change where the exception handling was done, but the processFilter(s) logic is now in place.
Modified:
webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java
Modified: webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java
URL: http://svn.apache.org/viewvc/webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java?rev=572042&r1=572041&r2=572042&view=diff
==============================================================================
--- webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java (original)
+++ webservices/muse/trunk/modules/muse-wsn-impl/src/org/apache/muse/ws/notification/impl/SimpleNotificationProducer.java Sun Sep 2 10:25:03 2007
@@ -1,680 +1,750 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.muse.ws.notification.impl;
-
-import java.lang.reflect.Method;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.namespace.QName;
-
-import org.w3c.dom.Element;
-
-import org.apache.muse.core.Persistence;
-import org.apache.muse.core.Resource;
-import org.apache.muse.core.ResourceManager;
-import org.apache.muse.core.ResourceManagerListener;
-import org.apache.muse.core.routing.MessageHandler;
-import org.apache.muse.core.serializer.Serializer;
-import org.apache.muse.core.serializer.SerializerRegistry;
-import org.apache.muse.util.LoggingUtils;
-import org.apache.muse.util.ReflectUtils;
-import org.apache.muse.util.messages.Messages;
-import org.apache.muse.util.messages.MessagesFactory;
-import org.apache.muse.util.xml.XmlSerializable;
-import org.apache.muse.util.xml.XmlUtils;
-import org.apache.muse.ws.addressing.EndpointReference;
-import org.apache.muse.ws.addressing.soap.SoapFault;
-import org.apache.muse.ws.resource.basefaults.BaseFault;
-import org.apache.muse.ws.notification.*;
-import org.apache.muse.ws.notification.faults.*;
-import org.apache.muse.ws.notification.properties.*;
-import org.apache.muse.ws.notification.topics.*;
-import org.apache.muse.ws.notification.topics.impl.*;
-import org.apache.muse.ws.resource.WsResource;
-import org.apache.muse.ws.resource.impl.AbstractWsResourceCapability;
-import org.apache.muse.ws.resource.lifetime.ScheduledTermination;
-import org.apache.muse.ws.resource.lifetime.WsrlConstants;
-import org.apache.muse.ws.resource.metadata.MetadataDescriptor;
-import org.apache.muse.ws.resource.properties.ResourcePropertyCollection;
-import org.apache.muse.ws.resource.properties.WsrpConstants;
-import org.apache.muse.ws.resource.properties.listeners.PropertyChangeListener;
-
-/**
- *
- * SimpleNotificationProducer is Muse's default implementation of the
- * WS-Notification NotificationProducer port type.
- * <br><br>
- * Resources that implement this port type should be deployed with the
- * {@linkplain SubscriptionManager SubscriptionManager} type as well; the
- * subscription resources cannot be managed without this endpoint.
- *
- * @author Dan Jemiolo (danj)
- *
- */
-
-public class SimpleNotificationProducer
- extends AbstractWsResourceCapability implements NotificationProducer, ResourceManagerListener
-{
- //
- // Used to lookup all exception messages
- //
- private static Messages _MESSAGES = MessagesFactory.get(SimpleNotificationProducer.class);
-
- //
- // all topic names in the topic set - not a hierarchical collection
- //
- private Set _allTopicNames = new HashSet();
-
- //
- // factory that will provide WSRP notifications with the right wrapper
- //
- private ChangeNotificationListenerFactory _listenerFactory = null;
-
- //
- // context path of the subscription manager resource type
- //
- private String _subscriptionPath = null;
-
- //
- // EPR -> current subscriptions
- //
- private Map _subscriptionsByEPR = new HashMap();
-
- //
- // hierarchical topic set supported by the resource
- //
- private TopicSet _topicSet = new SimpleTopicSet();
-
- public synchronized void addSubscription(WsResource sub)
- {
- SubscriptionManager subMgr =
- (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
- EndpointReference epr = sub.getEndpointReference();
- _subscriptionsByEPR.put(epr, subMgr);
- }
-
- public Topic addTopic(QName topicName)
- throws BaseFault
- {
- String namespace = topicName.getNamespaceURI();
- TopicNamespace topicSpace = getTopicNamespace(namespace);
-
- //
- // make sure the property's namespace has a topic space
- //
- if (topicSpace == null)
- topicSpace = addTopicNamespace(namespace);
-
- //
- // if the property is new, we need to add a topic for it so
- // clients can subscribe to change events
- //
- String localName = topicName.getLocalPart();
- Topic topic = new SimpleTopic(localName, topicSpace);
- topicSpace.addTopic(topic);
-
- //
- // add to our collection of names as well as the topic set tree
- //
- _allTopicNames.add(topicName);
-
- return topic;
- }
-
- public TopicNamespace addTopicNamespace(String namespace)
- throws BaseFault
- {
- TopicNamespace topics = new SimpleTopicNamespace(namespace);
- topics.setName(WsrpConstants.TOPIC_SPACE_NAME);
-
- _topicSet.addTopicNamespace(topics);
-
- return topics;
- }
-
- protected MessageHandler createGetCurrentMessageHandler()
- {
- MessageHandler handler = new GetCurrentMessageHandler();
-
- Method method = ReflectUtils.getFirstMethod(getClass(), "getCurrentMessage");
- handler.setMethod(method);
-
- return handler;
- }
-
- /**
- *
- * Users can override this method to provide an alternative implementation
- * of ChangeNotificationListenerFactory that wraps WS-RP change notifications
- * in a different set of XML content.
- *
- * @return An instance of WsrpNotificationListenerFactory
- *
- */
- protected ChangeNotificationListenerFactory createNotificationListenerFactory()
- {
- return new WsrpNotificationListenerFactory();
- }
-
- /**
- *
- * Users can override this method to provide an alternative implementation
- * of NotificationMessage.
- *
- * @return An instance of SimpleNotificationMessage
- *
- */
- protected NotificationMessage createNotificationMessage()
- {
- return new SimpleNotificationMessage();
- }
-
- protected MessageHandler createSubscribeHandler()
- {
- MessageHandler handler = new SubscribeHandler();
-
- Method method = ReflectUtils.getFirstMethod(getClass(), "subscribe");
- handler.setMethod(method);
-
- return handler;
- }
-
- public NotificationMessage getCurrentMessage(QName topicPath)
- throws NoCurrentMessageOnTopicFault,
- TopicNotSupportedFault
- {
- Topic topic = getTopic(topicPath);
-
- if (topic == null)
- {
- Object[] filler = { topicPath };
- throw new TopicNotSupportedFault(_MESSAGES.get("TopicNotFound", filler));
- }
-
- NotificationMessage message = topic.getCurrentMessage();
-
- if (message == null)
- {
- Object[] filler = { topicPath };
- throw new NoCurrentMessageOnTopicFault(_MESSAGES.get("NoMessageAvailable", filler));
- }
-
- return message;
- }
-
- public boolean getFixedTopicSet()
- {
- return true;
- }
-
- protected ChangeNotificationListenerFactory getNotificationListenerFactory()
- {
- return _listenerFactory;
- }
-
- public QName[] getPropertyNames()
- {
- return PROPERTIES;
- }
-
- protected String getSubscriptionContextPath()
- {
- return _subscriptionPath;
- }
-
- protected synchronized Collection getSubscriptions()
- {
- return Collections.unmodifiableCollection(_subscriptionsByEPR.values());
- }
-
- public Topic getTopic(QName topicName)
- {
- TopicNamespace topicSpace = getTopicNamespace(topicName.getNamespaceURI());
-
- if (topicSpace == null)
- return null;
-
- return topicSpace.getTopic(topicName.getLocalPart());
- }
-
- public QName[] getTopicExpression()
- {
- QName[] array = new QName[_allTopicNames.size()];
- return (QName[])_allTopicNames.toArray(array);
- }
-
- public String[] getTopicExpressionDialect()
- {
- return new String[]{ WstConstants.CONCRETE_TOPIC_URI };
- }
-
- public TopicNamespace getTopicNamespace(String namespace)
- {
- return getTopicSet().getTopicNamespace(namespace);
- }
-
- public TopicSet getTopicSet()
- {
- return _topicSet;
- }
-
- protected boolean hasSubscription(EndpointReference subscriptionEPR)
- {
- return _subscriptionsByEPR.containsKey(subscriptionEPR);
- }
-
- public boolean hasTopic(QName topicName)
- {
- return _allTopicNames.contains(topicName);
- }
-
- public void initialize()
- throws SoapFault
- {
- super.initialize();
-
- _listenerFactory = createNotificationListenerFactory();
-
- setMessageHandler(createSubscribeHandler());
- setMessageHandler(createGetCurrentMessageHandler());
-
- //
- // make sure we're exposing the subscription endpoint so that
- // clients can manage subscriptions/consumers
- //
- WsResource resource = getWsResource();
- ResourceManager manager = resource.getResourceManager();
- _subscriptionPath = manager.getResourceContextPath(SubscriptionManager.class);
-
- if (_subscriptionPath == null)
- throw new RuntimeException(_MESSAGES.get("NoSubscriptionManager"));
-
- //
- // make sure any persistence impl is of the right type
- //
- Persistence persistence = getPersistence();
-
- if (persistence != null)
- {
- if (!NotificationProducerPersistence.class.isAssignableFrom(persistence.getClass()))
- {
- Object[] filler = { NotificationProducerPersistence.class, persistence.getClass() };
- throw new RuntimeException(_MESSAGES.get("IncorrectPersistenceRoot", filler));
- }
-
- NotificationProducerPersistence npPersistence = (NotificationProducerPersistence)persistence;
- npPersistence.setNotificationProducer(this);
- }
-
- //
- // make sure we can listen for new subscriptions/destructions
- //
- manager.addListener(this);
- }
-
- public void initializeCompleted()
- throws SoapFault
- {
- super.initializeCompleted();
-
- ChangeNotificationListenerFactory factory = getNotificationListenerFactory();
-
- //
- // for every property in the resource's ws-rp doc, create a property
- // change listener that will send out notifications for each change
- //
- WsResource resource = getWsResource();
- ResourcePropertyCollection props = resource.getPropertyCollection();
- Iterator i = props.getPropertyNames().iterator();
-
- while (i.hasNext())
- {
- QName name = (QName)i.next();
-
- addTopic(name);
-
- PropertyChangeListener listener = factory.newInstance(name, resource);
- props.addChangeListener(listener);
- }
-
- //
- // if the resource supports either WS-RL capability, add support
- // for the WS-RL termination topic
- //
- if (resource.hasCapability(WsrlConstants.IMMEDIATE_TERMINATION_URI) ||
- resource.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
- addTopic(WsrlConstants.TERMINATION_TOPIC_QNAME);
-
- //
- // read all of the wsnt:TopicExpression values from the RMD doc
- // and make sure we create Topic objects for them
- //
- MetadataDescriptor rmd = props.getMetadata();
-
- if (rmd.hasProperty(WsnConstants.TOPIC_EXPRESSION_QNAME))
- {
- Collection[] values = new Collection[2];
- values[0] = rmd.getInitialValues(WsnConstants.TOPIC_EXPRESSION_QNAME);
- values[1] = rmd.getStaticValues(WsnConstants.TOPIC_EXPRESSION_QNAME);
-
- for (int n = 0; n < values.length; ++n)
- {
- i = values[n].iterator();
-
- while (i.hasNext())
- {
- Element topicExpression = (Element)i.next();
- QName topicName = XmlUtils.getQName(topicExpression);
- addTopic(topicName);
- }
- }
- }
- }
-
- public void prepareShutdown()
- throws SoapFault
- {
- //
- // if WSRL is present, send a termination notification to subscribers
- // before all subscriptions are destroyed
- //
- if (hasTopic(WsrlConstants.TERMINATION_TOPIC_QNAME))
- {
- Element payload = XmlUtils.createElement(WsrlConstants.NOTIFICATION_QNAME);
- XmlUtils.setElement(payload, WsrlConstants.TERMINATION_TIME_QNAME, new Date());
-
- publish(WsrlConstants.TERMINATION_TOPIC_QNAME, payload);
- }
-
- super.prepareShutdown();
- }
-
- public void publish(QName topicName, Element content)
- throws SoapFault
- {
- publish(topicName, new Element[]{ content });
- }
-
- public void publish(QName topicName, Element[] content)
- throws SoapFault
- {
- //
- // construct the message/payload
- //
- NotificationMessage message = createNotificationMessage();
-
- for (int n = 0; n < content.length; ++n)
- message.addMessageContent(content[n]);
-
- message.setTopic(topicName);
-
- //
- // give the message to each subscription so it can decide if it
- // should be sent or not
- //
- Iterator i = getSubscriptions().iterator();
-
- while (i.hasNext())
- {
- SubscriptionManager sub = (SubscriptionManager)i.next();
- sub.publish(message);
- }
-
- //
- // if a topic was used, record the message as the 'current' message
- // for the topic (will be returned by getCurrentMessage())
- //
- if (topicName != null)
- {
- Topic topic = getTopic(topicName);
- topic.setCurrentMessage(message);
- }
- }
-
- public void publish(QName topicName, XmlSerializable content)
- throws SoapFault
- {
- publish(topicName, new XmlSerializable[]{ content });
- }
-
- public void publish(QName topicName, XmlSerializable[] content)
- throws SoapFault
- {
- Element[] contentXML = new Element[content.length];
-
- for (int n = 0; n < content.length; ++n)
- contentXML[n] = content[n].toXML();
-
- publish(topicName, contentXML);
- }
-
- public void publish(QName topicName, QName contentName, Object content)
- throws SoapFault
- {
- publish(topicName, new QName[]{ contentName }, new Object[]{ content });
- }
-
- public void publish(QName topicName, QName[] contentNames, Object[] content)
- throws SoapFault
- {
- SerializerRegistry registry = SerializerRegistry.getInstance();
-
- Element[] contentXML = new Element[content.length];
-
- //
- // get the right serializer and transform POJO to XML
- //
- for (int n = 0; n < content.length; ++n)
- {
- Class contentType = content[n].getClass();
- Serializer ser = registry.getSerializer(contentType);
- contentXML[n] = ser.toXML(content[n], contentNames[n]);
- }
-
- publish(topicName, contentXML);
- }
-
- public synchronized void removeSubscription(EndpointReference epr)
- {
- _subscriptionsByEPR.remove(epr);
- }
-
- public void resourceAdded(EndpointReference epr, Resource resource)
- {
- //
- // no action taken - we only need to listen for resourceRemoved()
- // so that we can remove subscriptions that have been destroyed
- //
- }
-
- public void resourceRemoved(EndpointReference epr)
- {
- //
- // this call is synchronized, so if we're publishing, the
- // collection will not be updated until afterwards
- //
- boolean subExists = hasSubscription(epr);
-
- if (!subExists)
- return;
-
- removeSubscription(epr);
-
- NotificationProducerPersistence persistence = (NotificationProducerPersistence)getPersistence();
-
- try
- {
- if (persistence != null)
- persistence.resourceRemoved(epr);
- }
-
- catch (SoapFault fault)
- {
- LoggingUtils.logError(getLog(), fault);
- }
- }
-
- public WsResource subscribe(EndpointReference consumer,
- Filter filter,
- Date terminationTime,
- Policy policy)
- throws TopicNotSupportedFault,
- UnacceptableInitialTerminationTimeFault,
- SubscribeCreationFailedFault
- {
- if (consumer == null)
- throw new NullPointerException(_MESSAGES.get("NullConsumerEPR"));
-
- //
- // sanity check - the message handler should do this, but we handle
- // null filters just in case
- //
- if (filter == null)
- filter = PublishAllMessagesFilter.getInstance();
-
- //
- // HACK: ugh. topic filters are different from message
- // pattern and properties filters because they
- // rely on internal data structures (topics) that
- // are more concretely defined than the other two
- // filter types. it's a one-off. we have to check
- // against the topic set of the resource, and there
- // is no such equivalent task for other filter types.
- // so we downcast.
- //
- if (filter instanceof TopicFilter)
- {
- TopicFilter topicFilter = (TopicFilter)filter;
- QName topicName = topicFilter.getTopic();
-
- if (!hasTopic(topicName))
- {
- Object[] filler = { topicName };
- throw new TopicNotSupportedFault(_MESSAGES.get("TopicNotFound", filler));
- }
-
- Topic topic = getTopic(topicName);
- topicFilter.setTopic(topic);
- }
-
- //
- // HACK #2: we can't set the resource for producer properties filters
- // during filter creation without making the filter creation
- // API unusable in many desired use cases. so instead, we
- // hack around it by adding the resource reference late so
- // that the filter can still do the WSRP Get calls it needs
- // to do in order to evaluate the message
- //
- else if (filter instanceof ProducerPropertiesFilter)
- {
- ProducerPropertiesFilter propertiesFilter = (ProducerPropertiesFilter)filter;
- propertiesFilter.setResource(getWsResource());
- }
-
- WsResource producer = getWsResource();
- ResourceManager manager = producer.getResourceManager();
-
- //
- // create the resource to represent the subscription
- //
- String endpoint = getSubscriptionContextPath();
- WsResource sub = null;
-
- try
- {
- sub = (WsResource)manager.createResource(endpoint);
- }
-
- catch (SoapFault error)
- {
- throw new SubscribeCreationFailedFault(error);
- }
-
- //
- // use the subscription capability to set all the subscription fields
- //
- SubscriptionManager subMgr =
- (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
-
- EndpointReference producerEPR = producer.getEndpointReference();
- subMgr.setProducerReference(producerEPR);
-
- subMgr.setConsumerReference(consumer);
- subMgr.setFilter(filter);
- subMgr.setSubscriptionPolicy(policy);
-
- try
- {
- sub.initialize();
- manager.addResource(sub.getEndpointReference(), sub);
- }
-
- catch (SoapFault error)
- {
- throw new SubscribeCreationFailedFault(error);
- }
-
- //
- // set termination time AFTER we initialize so that we can rely
- // on the base Resource class' implementation of WS-RL (which
- // tries to apply the change immediately, rather than storing it
- // in a field and setting it during initialization)
- //
- if (sub.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
- {
- ScheduledTermination wsrl = (ScheduledTermination)sub.getCapability(WsrlConstants.SCHEDULED_TERMINATION_URI);
-
- try
- {
- wsrl.setTerminationTime(terminationTime);
- }
-
- catch (BaseFault error)
- {
- throw new UnacceptableInitialTerminationTimeFault(error);
- }
- }
-
- addSubscription(sub);
-
- NotificationProducerPersistence persistence = (NotificationProducerPersistence)getPersistence();
-
- try
- {
- if (persistence != null)
- persistence.resourceAdded(sub.getEndpointReference(), sub);
- }
-
- catch (SoapFault fault)
- {
- LoggingUtils.logError(getLog(), fault);
- }
-
- return sub;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.muse.ws.notification.impl;
+
+import java.lang.reflect.Method;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.namespace.QName;
+
+import org.w3c.dom.Element;
+
+import org.apache.muse.core.Persistence;
+import org.apache.muse.core.Resource;
+import org.apache.muse.core.ResourceManager;
+import org.apache.muse.core.ResourceManagerListener;
+import org.apache.muse.core.routing.MessageHandler;
+import org.apache.muse.core.serializer.Serializer;
+import org.apache.muse.core.serializer.SerializerRegistry;
+import org.apache.muse.util.LoggingUtils;
+import org.apache.muse.util.ReflectUtils;
+import org.apache.muse.util.messages.Messages;
+import org.apache.muse.util.messages.MessagesFactory;
+import org.apache.muse.util.xml.XmlSerializable;
+import org.apache.muse.util.xml.XmlUtils;
+import org.apache.muse.ws.addressing.EndpointReference;
+import org.apache.muse.ws.addressing.soap.SoapFault;
+import org.apache.muse.ws.resource.basefaults.BaseFault;
+import org.apache.muse.ws.notification.*;
+import org.apache.muse.ws.notification.faults.*;
+import org.apache.muse.ws.notification.properties.*;
+import org.apache.muse.ws.notification.topics.*;
+import org.apache.muse.ws.notification.topics.impl.*;
+import org.apache.muse.ws.resource.WsResource;
+import org.apache.muse.ws.resource.impl.AbstractWsResourceCapability;
+import org.apache.muse.ws.resource.lifetime.ScheduledTermination;
+import org.apache.muse.ws.resource.lifetime.WsrlConstants;
+import org.apache.muse.ws.resource.metadata.MetadataDescriptor;
+import org.apache.muse.ws.resource.properties.ResourcePropertyCollection;
+import org.apache.muse.ws.resource.properties.WsrpConstants;
+import org.apache.muse.ws.resource.properties.listeners.PropertyChangeListener;
+
+/**
+ *
+ * SimpleNotificationProducer is Muse's default implementation of the
+ * WS-Notification NotificationProducer port type.
+ * <br><br>
+ * Resources that implement this port type should be deployed with the
+ * {@linkplain SubscriptionManager SubscriptionManager} type as well; the
+ * subscription resources cannot be managed without this endpoint.
+ *
+ * @author Dan Jemiolo (danj)
+ *
+ */
+
+public class SimpleNotificationProducer
+ extends AbstractWsResourceCapability implements NotificationProducer, ResourceManagerListener
+{
+ //
+ // Used to lookup all exception messages
+ //
+ private static Messages _MESSAGES = MessagesFactory.get(SimpleNotificationProducer.class);
+
+ //
+ // all topic names in the topic set - not a hierarchical collection
+ //
+ private Set _allTopicNames = new HashSet();
+
+ //
+ // factory that will provide WSRP notifications with the right wrapper
+ //
+ private ChangeNotificationListenerFactory _listenerFactory = null;
+
+ //
+ // context path of the subscription manager resource type
+ //
+ private String _subscriptionPath = null;
+
+ //
+ // EPR -> current subscriptions
+ //
+ private Map _subscriptionsByEPR = new HashMap();
+
+ //
+ // hierarchical topic set supported by the resource
+ //
+ private TopicSet _topicSet = new SimpleTopicSet();
+
+ public synchronized void addSubscription(WsResource sub)
+ throws SoapFault
+ {
+ SubscriptionManager subMgr =
+ (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
+ EndpointReference epr = sub.getEndpointReference();
+
+ //
+ // allow persistent subscriptions to be processed as if added live
+ //
+ processFilters(getWsResource(), subMgr.getFilter());
+
+ _subscriptionsByEPR.put(epr, subMgr);
+ }
+
+ public Topic addTopic(QName topicName)
+ throws BaseFault
+ {
+ String namespace = topicName.getNamespaceURI();
+ TopicNamespace topicSpace = getTopicNamespace(namespace);
+
+ //
+ // make sure the property's namespace has a topic space
+ //
+ if (topicSpace == null)
+ topicSpace = addTopicNamespace(namespace);
+
+ //
+ // if the property is new, we need to add a topic for it so
+ // clients can subscribe to change events
+ //
+ String localName = topicName.getLocalPart();
+ Topic topic = new SimpleTopic(localName, topicSpace);
+ topicSpace.addTopic(topic);
+
+ //
+ // add to our collection of names as well as the topic set tree
+ //
+ _allTopicNames.add(topicName);
+
+ return topic;
+ }
+
+ public TopicNamespace addTopicNamespace(String namespace)
+ throws BaseFault
+ {
+ TopicNamespace topics = new SimpleTopicNamespace(namespace);
+ topics.setName(WsrpConstants.TOPIC_SPACE_NAME);
+
+ _topicSet.addTopicNamespace(topics);
+
+ return topics;
+ }
+
+ protected MessageHandler createGetCurrentMessageHandler()
+ {
+ MessageHandler handler = new GetCurrentMessageHandler();
+
+ Method method = ReflectUtils.getFirstMethod(getClass(), "getCurrentMessage");
+ handler.setMethod(method);
+
+ return handler;
+ }
+
+ /**
+ *
+ * Users can override this method to provide an alternative implementation
+ * of ChangeNotificationListenerFactory that wraps WS-RP change notifications
+ * in a different set of XML content.
+ *
+ * @return An instance of WsrpNotificationListenerFactory
+ *
+ */
+ protected ChangeNotificationListenerFactory createNotificationListenerFactory()
+ {
+ return new WsrpNotificationListenerFactory();
+ }
+
+ /**
+ *
+ * Users can override this method to provide an alternative implementation
+ * of NotificationMessage.
+ *
+ * @return An instance of SimpleNotificationMessage
+ *
+ */
+ protected NotificationMessage createNotificationMessage()
+ {
+ return new SimpleNotificationMessage();
+ }
+
+ protected MessageHandler createSubscribeHandler()
+ {
+ MessageHandler handler = new SubscribeHandler();
+
+ Method method = ReflectUtils.getFirstMethod(getClass(), "subscribe");
+ handler.setMethod(method);
+
+ return handler;
+ }
+
+ public NotificationMessage getCurrentMessage(QName topicPath)
+ throws NoCurrentMessageOnTopicFault,
+ TopicNotSupportedFault
+ {
+ Topic topic = getTopic(topicPath);
+
+ if (topic == null)
+ {
+ Object[] filler = { topicPath };
+ throw new TopicNotSupportedFault(_MESSAGES.get("TopicNotFound", filler));
+ }
+
+ NotificationMessage message = topic.getCurrentMessage();
+
+ if (message == null)
+ {
+ Object[] filler = { topicPath };
+ throw new NoCurrentMessageOnTopicFault(_MESSAGES.get("NoMessageAvailable", filler));
+ }
+
+ return message;
+ }
+
+ public boolean getFixedTopicSet()
+ {
+ return true;
+ }
+
+ protected ChangeNotificationListenerFactory getNotificationListenerFactory()
+ {
+ return _listenerFactory;
+ }
+
+ public QName[] getPropertyNames()
+ {
+ return PROPERTIES;
+ }
+
+ protected String getSubscriptionContextPath()
+ {
+ return _subscriptionPath;
+ }
+
+ protected synchronized Collection getSubscriptions()
+ {
+ return Collections.unmodifiableCollection(_subscriptionsByEPR.values());
+ }
+
+ public Topic getTopic(QName topicName)
+ {
+ TopicNamespace topicSpace = getTopicNamespace(topicName.getNamespaceURI());
+
+ if (topicSpace == null)
+ return null;
+
+ return topicSpace.getTopic(topicName.getLocalPart());
+ }
+
+ public QName[] getTopicExpression()
+ {
+ QName[] array = new QName[_allTopicNames.size()];
+ return (QName[])_allTopicNames.toArray(array);
+ }
+
+ public String[] getTopicExpressionDialect()
+ {
+ return new String[]{ WstConstants.CONCRETE_TOPIC_URI };
+ }
+
+ public TopicNamespace getTopicNamespace(String namespace)
+ {
+ return getTopicSet().getTopicNamespace(namespace);
+ }
+
+ public TopicSet getTopicSet()
+ {
+ return _topicSet;
+ }
+
+ protected boolean hasSubscription(EndpointReference subscriptionEPR)
+ {
+ return _subscriptionsByEPR.containsKey(subscriptionEPR);
+ }
+
+ public boolean hasTopic(QName topicName)
+ {
+ return _allTopicNames.contains(topicName);
+ }
+
+ public void initialize()
+ throws SoapFault
+ {
+ super.initialize();
+
+ _listenerFactory = createNotificationListenerFactory();
+
+ setMessageHandler(createSubscribeHandler());
+ setMessageHandler(createGetCurrentMessageHandler());
+
+ //
+ // make sure we're exposing the subscription endpoint so that
+ // clients can manage subscriptions/consumers
+ //
+ WsResource resource = getWsResource();
+ ResourceManager manager = resource.getResourceManager();
+ _subscriptionPath = manager.getResourceContextPath(SubscriptionManager.class);
+
+ if (_subscriptionPath == null)
+ throw new RuntimeException(_MESSAGES.get("NoSubscriptionManager"));
+
+ //
+ // make sure any persistence impl is of the right type
+ //
+ Persistence persistence = getPersistence();
+
+ if (persistence != null)
+ {
+ if (!NotificationProducerPersistence.class.isAssignableFrom(persistence.getClass()))
+ {
+ Object[] filler = { NotificationProducerPersistence.class, persistence.getClass() };
+ throw new RuntimeException(_MESSAGES.get("IncorrectPersistenceRoot", filler));
+ }
+
+ NotificationProducerPersistence npPersistence = (NotificationProducerPersistence)persistence;
+ npPersistence.setNotificationProducer(this);
+ }
+
+ //
+ // make sure we can listen for new subscriptions/destructions
+ //
+ manager.addListener(this);
+ }
+
+ public void initializeCompleted()
+ throws SoapFault
+ {
+ super.initializeCompleted();
+
+ ChangeNotificationListenerFactory factory = getNotificationListenerFactory();
+
+ //
+ // for every property in the resource's ws-rp doc, create a property
+ // change listener that will send out notifications for each change
+ //
+ WsResource resource = getWsResource();
+ ResourcePropertyCollection props = resource.getPropertyCollection();
+ Iterator i = props.getPropertyNames().iterator();
+
+ while (i.hasNext())
+ {
+ QName name = (QName)i.next();
+
+ addTopic(name);
+
+ PropertyChangeListener listener = factory.newInstance(name, resource);
+ props.addChangeListener(listener);
+ }
+
+ //
+ // if the resource supports either WS-RL capability, add support
+ // for the WS-RL termination topic
+ //
+ if (resource.hasCapability(WsrlConstants.IMMEDIATE_TERMINATION_URI) ||
+ resource.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
+ addTopic(WsrlConstants.TERMINATION_TOPIC_QNAME);
+
+ //
+ // read all of the wsnt:TopicExpression values from the RMD doc
+ // and make sure we create Topic objects for them
+ //
+ MetadataDescriptor rmd = props.getMetadata();
+
+ if (rmd.hasProperty(WsnConstants.TOPIC_EXPRESSION_QNAME))
+ {
+ Collection[] values = new Collection[2];
+ values[0] = rmd.getInitialValues(WsnConstants.TOPIC_EXPRESSION_QNAME);
+ values[1] = rmd.getStaticValues(WsnConstants.TOPIC_EXPRESSION_QNAME);
+
+ for (int n = 0; n < values.length; ++n)
+ {
+ i = values[n].iterator();
+
+ while (i.hasNext())
+ {
+ Element topicExpression = (Element)i.next();
+ QName topicName = XmlUtils.getQName(topicExpression);
+ addTopic(topicName);
+ }
+ }
+ }
+ }
+
+ public void prepareShutdown()
+ throws SoapFault
+ {
+ //
+ // if WSRL is present, send a termination notification to subscribers
+ // before all subscriptions are destroyed
+ //
+ if (hasTopic(WsrlConstants.TERMINATION_TOPIC_QNAME))
+ {
+ Element payload = XmlUtils.createElement(WsrlConstants.NOTIFICATION_QNAME);
+ XmlUtils.setElement(payload, WsrlConstants.TERMINATION_TIME_QNAME, new Date());
+
+ publish(WsrlConstants.TERMINATION_TOPIC_QNAME, payload);
+ }
+
+ super.prepareShutdown();
+ }
+
+ /**
+ *
+ * Developers can override this to provide further custom processing of filters
+ * being sure to call super.processFilters() at the end of their code.
+ *
+ * @param res
+ * @param filter
+ *
+ * @throws SoapFault
+ *
+ */
+ protected void processFilter(WsResource res, Filter filter)
+ throws SoapFault
+ {
+ //
+ // HACK: ugh. topic filters are different from message
+ // pattern and properties filters because they
+ // rely on internal data structures (topics) that
+ // are more concretely defined than the other two
+ // filter types. it's a one-off. we have to check
+ // against the topic set of the resource, and there
+ // is no such equivalent task for other filter types.
+ // so we downcast.
+ //
+ if (filter instanceof TopicFilter)
+ {
+ TopicFilter topicFilter = (TopicFilter)filter;
+ QName topicName = topicFilter.getTopic();
+
+ if (!hasTopic(topicName))
+ {
+ Object[] filler = { topicName };
+ throw new TopicNotSupportedFault(_MESSAGES.get("TopicNotFound", filler));
+ }
+
+ Topic topic = getTopic(topicName);
+ topicFilter.setTopic(topic);
+ }
+
+ //
+ // HACK #2: we can't set the resource for producer properties filters
+ // during filter creation without making the filter creation
+ // API unusable in many desired use cases. so instead, we
+ // hack around it by adding the resource reference late so
+ // that the filter can still do the WSRP Get calls it needs
+ // to do in order to evaluate the message
+ //
+ else if (filter instanceof ProducerPropertiesFilter)
+ {
+ ProducerPropertiesFilter propertiesFilter = (ProducerPropertiesFilter)filter;
+ propertiesFilter.setResource(res);
+ }
+ }
+
+ protected void processFilters(WsResource res, Filter filter)
+ throws SoapFault
+ {
+ if (filter instanceof FilterCollection)
+ {
+ FilterCollection collection = (FilterCollection)filter;
+ Iterator itr = collection.getFilters().iterator();
+
+ while (itr.hasNext())
+ {
+ Filter subFilter = (Filter)itr.next();
+ processFilter(res, subFilter);
+ }
+ }
+
+ else
+ processFilter(res, filter);
+ }
+
+ public void publish(QName topicName, Element content)
+ throws SoapFault
+ {
+ publish(topicName, new Element[]{ content });
+ }
+
+ public void publish(QName topicName, Element[] content)
+ throws SoapFault
+ {
+ //
+ // construct the message/payload
+ //
+ NotificationMessage message = createNotificationMessage();
+
+ for (int n = 0; n < content.length; ++n)
+ message.addMessageContent(content[n]);
+
+ message.setTopic(topicName);
+
+ //
+ // give the message to each subscription so it can decide if it
+ // should be sent or not
+ //
+ Iterator i = getSubscriptions().iterator();
+
+ while (i.hasNext())
+ {
+ SubscriptionManager sub = (SubscriptionManager)i.next();
+ sub.publish(message);
+ }
+
+ //
+ // if a topic was used, record the message as the 'current' message
+ // for the topic (will be returned by getCurrentMessage())
+ //
+ if (topicName != null)
+ {
+ Topic topic = getTopic(topicName);
+ topic.setCurrentMessage(message);
+ }
+ }
+
+ public void publish(QName topicName, XmlSerializable content)
+ throws SoapFault
+ {
+ publish(topicName, new XmlSerializable[]{ content });
+ }
+
+ public void publish(QName topicName, XmlSerializable[] content)
+ throws SoapFault
+ {
+ Element[] contentXML = new Element[content.length];
+
+ for (int n = 0; n < content.length; ++n)
+ contentXML[n] = content[n].toXML();
+
+ publish(topicName, contentXML);
+ }
+
+ public void publish(QName topicName, QName contentName, Object content)
+ throws SoapFault
+ {
+ publish(topicName, new QName[]{ contentName }, new Object[]{ content });
+ }
+
+ public void publish(QName topicName, QName[] contentNames, Object[] content)
+ throws SoapFault
+ {
+ SerializerRegistry registry = SerializerRegistry.getInstance();
+
+ Element[] contentXML = new Element[content.length];
+
+ //
+ // get the right serializer and transform POJO to XML
+ //
+ for (int n = 0; n < content.length; ++n)
+ {
+ Class contentType = content[n].getClass();
+ Serializer ser = registry.getSerializer(contentType);
+ contentXML[n] = ser.toXML(content[n], contentNames[n]);
+ }
+
+ publish(topicName, contentXML);
+ }
+
+ public synchronized void removeSubscription(EndpointReference epr)
+ {
+ _subscriptionsByEPR.remove(epr);
+ }
+
+ public void resourceAdded(EndpointReference epr, Resource resource)
+ {
+ //
+ // no action taken - we only need to listen for resourceRemoved()
+ // so that we can remove subscriptions that have been destroyed
+ //
+ }
+
+ public void resourceRemoved(EndpointReference epr)
+ {
+ //
+ // this call is synchronized, so if we're publishing, the
+ // collection will not be updated until afterwards
+ //
+ boolean subExists = hasSubscription(epr);
+
+ if (!subExists)
+ return;
+
+ removeSubscription(epr);
+
+ NotificationProducerPersistence persistence = (NotificationProducerPersistence)getPersistence();
+
+ try
+ {
+ if (persistence != null)
+ persistence.resourceRemoved(epr);
+ }
+
+ catch (SoapFault fault)
+ {
+ LoggingUtils.logError(getLog(), fault);
+ }
+ }
+
+ public WsResource subscribe(EndpointReference consumer,
+ Filter filter,
+ Date terminationTime,
+ Policy policy)
+ throws TopicNotSupportedFault,
+ UnacceptableInitialTerminationTimeFault,
+ SubscribeCreationFailedFault
+ {
+ if (consumer == null)
+ throw new NullPointerException(_MESSAGES.get("NullConsumerEPR"));
+
+ //
+ // sanity check - the message handler should do this, but we handle
+ // null filters just in case
+ //
+ if (filter == null)
+ filter = PublishAllMessagesFilter.getInstance();
+
+ WsResource producer = getWsResource();
+
+ try
+ {
+ processFilters(producer, filter);
+ }
+
+ catch (SoapFault error)
+ {
+ throw new SubscribeCreationFailedFault(error);
+ }
+
+ ResourceManager manager = producer.getResourceManager();
+
+ //
+ // create the resource to represent the subscription
+ //
+ String endpoint = getSubscriptionContextPath();
+ WsResource sub = null;
+
+ try
+ {
+ sub = (WsResource)manager.createResource(endpoint);
+ }
+
+ catch (SoapFault error)
+ {
+ throw new SubscribeCreationFailedFault(error);
+ }
+
+ //
+ // use the subscription capability to set all the subscription fields
+ //
+ SubscriptionManager subMgr =
+ (SubscriptionManager)sub.getCapability(WsnConstants.SUBSCRIPTION_MGR_URI);
+
+ EndpointReference producerEPR = producer.getEndpointReference();
+ subMgr.setProducerReference(producerEPR);
+
+ subMgr.setConsumerReference(consumer);
+ subMgr.setFilter(filter);
+ subMgr.setSubscriptionPolicy(policy);
+
+ try
+ {
+ sub.initialize();
+ manager.addResource(sub.getEndpointReference(), sub);
+ }
+
+ catch (SoapFault error)
+ {
+ throw new SubscribeCreationFailedFault(error);
+ }
+
+ //
+ // set termination time AFTER we initialize so that we can rely
+ // on the base Resource class' implementation of WS-RL (which
+ // tries to apply the change immediately, rather than storing it
+ // in a field and setting it during initialization)
+ //
+ if (sub.hasCapability(WsrlConstants.SCHEDULED_TERMINATION_URI))
+ {
+ ScheduledTermination wsrl = (ScheduledTermination)sub.getCapability(WsrlConstants.SCHEDULED_TERMINATION_URI);
+
+ try
+ {
+ wsrl.setTerminationTime(terminationTime);
+ }
+
+ catch (BaseFault error)
+ {
+ throw new UnacceptableInitialTerminationTimeFault(error);
+ }
+ }
+
+ try
+ {
+ addSubscription(sub);
+ }
+
+ catch (SoapFault error)
+ {
+ try
+ {
+ manager.removeResource(sub.getEndpointReference());
+ }
+
+ catch (SoapFault error2)
+ {
+ error2.printStackTrace();
+ }
+
+ throw new SubscribeCreationFailedFault(error);
+ }
+
+ NotificationProducerPersistence persistence = (NotificationProducerPersistence)getPersistence();
+
+ try
+ {
+ if (persistence != null)
+ persistence.resourceAdded(sub.getEndpointReference(), sub);
+ }
+
+ catch (SoapFault fault)
+ {
+ LoggingUtils.logError(getLog(), fault);
+ }
+
+ return sub;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: muse-commits-unsubscribe@ws.apache.org
For additional commands, e-mail: muse-commits-help@ws.apache.org