You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pubscribe-dev@ws.apache.org by sc...@apache.org on 2005/02/03 21:32:56 UTC
svn commit: r151222 - in incubator/hermes/trunk/src:
java/org/apache/ws/notification/base/
java/org/apache/ws/notification/base/impl/
java/org/apache/ws/notification/base/v1_2/porttype/impl/
java/org/apache/ws/notification/topics/impl/ test/org/apache/
Author: scamp
Date: Thu Feb 3 12:32:53 2005
New Revision: 151222
URL: http://svn.apache.org/viewcvs?view=rev&rev=151222
Log:
Added the building of the NotificationMessage
Added:
incubator/hermes/trunk/src/test/org/apache/
Modified:
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java?view=diff&r1=151221&r2=151222
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java Thu Feb 3 12:32:53 2005
@@ -19,7 +19,11 @@
import org.apache.ws.resource.properties.query.QueryExpression;
import org.apache.ws.resource.lifetime.ScheduledResourceTerminationResource;
import org.apache.ws.notification.topics.TopicExpression;
+import org.apache.ws.notification.topics.impl.SimpleSubscriptionTopicListener;
+import org.apache.ws.notification.base.v1_2.porttype.impl.NotificationProducerPortTypeImpl;
import org.apache.ws.addressing.EndpointReference;
+import org.apache.ws.pubsub.NotificationConsumer;
+import org.apache.ws.pubsub.NotificationProducer;
/**
* Interface to be implemented by subscription resources. Exposes the information
@@ -67,4 +71,10 @@
void setEpr(EndpointReference epr);
EndpointReference getEpr();
+
+ void setNotificationConsumer(NotificationConsumer notificationConsumer);
+
+ void setNotificationProducer(NotificationProducer notificationProducer);
+
+ EndpointReference getConsumerReference();
}
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java?view=diff&r1=151221&r2=151222
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java Thu Feb 3 12:32:53 2005
@@ -75,6 +75,8 @@
private static final UuidGenerator UUID_GEN =
UuidGeneratorFactory.createUUIdGenerator();
+ private NotificationConsumer m_notificationConsumer;
+ private NotificationProducer m_notificationProducer;
public ResourcePropertySet getResourcePropertySet()
{
@@ -311,12 +313,12 @@
public NotificationConsumer getNotificationConsumer()
{
- return null;
+ return m_notificationConsumer;
}
public NotificationProducer getNotificationProducer()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return m_notificationProducer;
}
public EndpointReference getEpr()
@@ -328,4 +330,15 @@
{
m_epr = epr;
}
+
+ public void setNotificationConsumer(NotificationConsumer notificationConsumer)
+ {
+ m_notificationConsumer = notificationConsumer;
+ }
+
+ public void setNotificationProducer(NotificationProducer notificationProducer)
+ {
+ m_notificationProducer = notificationProducer;
+ }
+
}
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java?view=diff&r1=151221&r2=151222
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java Thu Feb 3 12:32:53 2005
@@ -36,6 +36,10 @@
import org.apache.ws.addressing.XmlBeansEndpointReference;
import org.apache.ws.addressing.EndpointReference;
import org.apache.ws.XmlObjectWrapper;
+import org.apache.ws.pubsub.NotificationProducer;
+import org.apache.ws.pubsub.Filter;
+import org.apache.ws.pubsub.NotificationConsumer;
+import org.apache.ws.pubsub.DeliveryMode;
import org.apache.xmlbeans.XmlObject;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.GetCurrentMessageDocument;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.GetCurrentMessageResponseDocument;
@@ -61,7 +65,7 @@
* @author Ian Springer (ian DOT springer AT hp DOT com)
*/
public class NotificationProducerPortTypeImpl extends AbstractResourcePropertiesPortType
- implements NotificationProducerPortType
+ implements NotificationProducerPortType, NotificationProducer
{
public NotificationProducerPortTypeImpl( ResourceContext resourceContext )
@@ -112,15 +116,16 @@
SubscriptionHome subscriptionHome = (SubscriptionHome) initialContext.lookup( SubscriptionHome.HOME_LOCATION );
Subscription subscription = subscriptionHome.create(Subscription1_2Resource.class, new XmlBeansEndpointReference(consumerEPR), producerEPR, initialTerminationTime,subPolicy, precondition, selector,getResourceKey(), ((AbstractResourceContext)getResourceContext()).getResourceHomeLocation(),topicExpr,useNotify );
+ subscription.setNotificationProducer(this);
epr = (EndpointReferenceType) ((XmlObjectWrapper)subscription.getEpr()).getXmlObject();
Collection collection = evaluateTopicExpression(topicExpr);
-
+ SimpleSubscriptionTopicListener simpleSubscriptionTopicListener = new SimpleSubscriptionTopicListener(subscription);
//add listeners
for (Iterator iterator = collection.iterator(); iterator.hasNext();)
{
Topic topic = (Topic) iterator.next();
- topic.addTopicListener(new SimpleSubscriptionTopicListener(subscription));
+ topic.addTopicListener(simpleSubscriptionTopicListener);
}
}
catch (Exception e)
@@ -205,5 +210,41 @@
//get last message from topic
// TODO
return message;
+ }
+
+ /**
+ * Returns this producer's endpoint reference.
+ *
+ * @return this producer's endpoint reference
+ */
+ public EndpointReference getEPR()
+ {
+ return buildEPR(getResourceContext());
+ }
+
+ /**
+ * Subscribe to notifications from this producer.
+ *
+ * @param notificationConsumer
+ * @param filters
+ * @param initialTerminationTime
+ * @param deliveryMode the notification delivery mode, or null to use default mode
+ * @param policy a policy to be associated with the subscription, or null if no policy should be used
+ * @return the subscription
+ */
+ public org.apache.ws.pubsub.Subscription subscribe(NotificationConsumer notificationConsumer, Filter filters[], Calendar initialTerminationTime, DeliveryMode deliveryMode, Object policy)
+ {
+ return null;
+ }
+
+ /**
+ * Returns the last notification message published for the given set of filters.
+ *
+ * @param filters
+ * @return
+ */
+ public Object getCurrentMessage(Filter filters[])
+ {
+ return null;
}
}
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java?view=diff&r1=151221&r2=151222
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java Thu Feb 3 12:32:53 2005
@@ -1,262 +1,332 @@
-/*
- * Copyright 2004 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ws.notification.topics.impl;
-
-
-import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ws.XmlObjectWrapper;
-import org.apache.ws.addressing.Addressing_03_2003_Constants;
-import org.apache.ws.addressing.EndpointReference;
-import org.apache.ws.notification.base.Subscription;
-import org.apache.ws.notification.base.v1_2.BaseNotification1_2Constants;
-import org.apache.ws.notification.topics.Topic;
-import org.apache.ws.notification.topics.TopicExpression;
-import org.apache.ws.notification.topics.TopicExpressionEngine;
-import org.apache.ws.notification.topics.TopicExpressionEvaluator;
-import org.apache.ws.notification.topics.TopicListener;
-import org.apache.ws.notification.topics.topicexpression.impl.TopicExpressionException;
-import org.apache.ws.pubsub.NotificationConsumer;
-import org.apache.ws.pubsub.emitter.EmitterTask;
-import org.apache.ws.util.XmlBeanUtils;
-import org.apache.ws.util.thread.NamedThread;
-import org.apache.xmlbeans.XmlCursor;
-import org.apache.xmlbeans.XmlObject;
-import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotificationMessageHolderType;
-import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotifyDocument;
-import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
-import org.w3c.dom.Document;
-import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
-import org.xmlsoap.schemas.ws.x2003.x03.addressing.ReferencePropertiesType;
-
-import javax.xml.soap.MessageFactory;
-import javax.xml.soap.SOAPBody;
-import javax.xml.soap.SOAPElement;
-import javax.xml.soap.SOAPEnvelope;
-import javax.xml.soap.SOAPFactory;
-import javax.xml.soap.SOAPHeader;
-import javax.xml.soap.SOAPHeaderElement;
-import javax.xml.soap.SOAPMessage;
-import java.io.Serializable;
-import java.net.URL;
-import java.util.List;
-
-/**
- * Topic listener implementation that will trigger notifications when a topic
- * changes. To be used in conjunction with the SimpleSubscription class.
- *
- * @see SimpleSubscription
- */
-public class SimpleSubscriptionTopicListener implements TopicListener,
- Serializable
-{
-
- private static Log logger =
- LogFactory.getLog(SimpleSubscriptionTopicListener.class.getName());
-
- private transient NotificationConsumer consumerPort = null;
-
- private Subscription m_subscription;
-
- // the thread pool used to emit notifications
- private static final PooledExecutor EMITTER_POOL;
-
- static
- {
- EMITTER_POOL = new PooledExecutor( 100 );
-
- // make sure the threads are non-daemon threads so they have time to complete even if the JVM wants to shut down
- EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( "notifmgr-emitter", false ) );
- }
-
- /**
- * Construct a listener instance.
- *
- * @param Subscription the subscription which is being wrapped in a listener
- */
- public SimpleSubscriptionTopicListener(
- Subscription subscription)
- {
- this.m_subscription = subscription;
- }
-
- public void topicChanged(Topic topic)
- {
- Subscription subscription = m_subscription;
-
- if (subscription != null) {
- try
- {
- this.notify(subscription,
- topic.getTopicPath(),
- topic.getCurrentMessage());
- }
- catch(Exception e)
- {
- logger.warn("notificationFailed"+ subscription);
- logger.debug("", e);
- }
- }
- }
-
- public void topicAdded(Topic topic)
- { //todo
- }
-
- public void topicRemoved(Topic topic)
- { //todo
- }
-
-
-
- /**
- * @return Subscription
- */
- public Subscription getSubscription() {
- return this.m_subscription;
- }
-
-
- /**
- * Send a notification
- *
- * @param subscription The subscription for which to send the notification
- * @param topicPath The topic path of the topic that caused the
- * notification
- * @param msg The new value of the topic
- * @throws Exception
- */
- protected void notify(
- Subscription subscription, List topicPath, Object msg)
- throws Exception
- {
-
- synchronized(subscription)
- {
- if(!subscription.isPaused())
- {
- System.out.println("Notification received for subscription with Id " + subscription.getID() + "; value: " + msg);
-
- EndpointReference epr = subscription.getNotificationConsumer().getEPR();
-
- if(subscription.getUseNotify())
- {
- NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance();
- NotifyDocument.Notify notify = notifyDoc.addNewNotify();
- NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage();
-
- //assumes xmlobject for msg...this needs to change
- notificationMessageHolderType.setMessage((XmlObject) msg);
-
- //set the producer ref
- notificationMessageHolderType.setProducerReference((EndpointReferenceType) ((XmlObjectWrapper)subscription.getNotificationProducer().getEPR()).getXmlObject());
-
- TopicExpressionEngine engine =
- TopicExpressionEngineImpl.getInstance();
- TopicExpression topicExpressionIntf = subscription.getTopicExpression();
-
- TopicExpressionType tp = (TopicExpressionType) ((XmlObjectWrapper)topicExpressionIntf).getXmlObject();
-
- String dialect = topicExpressionIntf.getDialect().toString();
-
- TopicExpressionEvaluator evaluator =
- engine.getEvaluator(dialect);
-
- //TopicExpression
- TopicExpression topicExpression = null;
- try
- {
- topicExpression = (TopicExpression) evaluator.toTopicExpression(
- topicPath);
- }
- catch (TopicExpressionException e)
- {
- //todo do something
- }
-
- notificationMessageHolderType.setTopic((TopicExpressionType) ((XmlObjectWrapper)topicExpression).getXmlObject());
- //notify is now populated
-
- //build soap mesage
- SOAPMessage soapMessage = buildSOAPMessage(notifyDoc, (EndpointReferenceType)((XmlObjectWrapper)epr).getXmlObject());
-
- EMITTER_POOL.execute(EmitterTask.createEmitterTask(soapMessage, new URL(epr.getAddress().toString())));
-
- }
- else
- {
- // TODO: raw notifications ///could we build the same as above and simply strip off the notify?? not sure
- }
- }
- }
- }
- private SOAPMessage buildSOAPMessage( XmlObject fullMsgBodyElem,
- EndpointReferenceType consumerEPR )
- throws Exception
- {
- SOAPMessage msg = MessageFactory.newInstance( ).createMessage( );
- SOAPEnvelope envelope = msg.getSOAPPart( ).getEnvelope( );
- SOAPBody body = envelope.getBody( );
- body.addDocument( (Document)fullMsgBodyElem.newDomNode() );
- SOAPHeader header = msg.getSOAPHeader( );
- addWSAHeaders( header, consumerEPR );
-
- return msg;
- }
-
- /*
- * Add WS-Addressing headers to a notification.
- *
- * @param header - The header to which to add the WS-Addressing headers.
- * @param consumerEPR - An endpointReferece to the consumer of this notification.
- * @throws Exception
- */
- private void addWSAHeaders( SOAPHeader header,
- EndpointReferenceType consumerEPR )
- throws Exception
- {
- SOAPFactory factory = SOAPFactory.newInstance( );
-
- SOAPHeaderElement h = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.TO,
- Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
- Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
-
- h.addTextNode( consumerEPR.getAddress( ).getStringValue( ) );
-
- h = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.ACTION ,
- Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA
- , Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
- h.addTextNode( BaseNotification1_2Constants.NOTIFY_ACTION_URL );
-
- ReferencePropertiesType props = consumerEPR.getReferenceProperties( );
- XmlCursor cursor = props.newCursor( );
-
- boolean haveChild = cursor.toFirstChild( );
-
- while ( haveChild )
- {
- SOAPElement e = XmlBeanUtils.toSOAPElement( cursor.getObject( ) );
- h = header.addHeaderElement( e.getElementName( ) );
- h.addTextNode( e.getValue( ) );
-
- haveChild = cursor.toNextSibling( );
- }
-
- cursor.dispose( );
- }
-
-}
+/*=============================================================================*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *=============================================================================*/
+package org.apache.ws.notification.topics.impl;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ws.XmlObjectWrapper;
+import org.apache.ws.addressing.Addressing_03_2003_Constants;
+import org.apache.ws.addressing.EndpointReference;
+import org.apache.ws.notification.base.Subscription;
+import org.apache.ws.notification.base.v1_2.BaseNotification1_2Constants;
+import org.apache.ws.notification.topics.Topic;
+import org.apache.ws.notification.topics.TopicExpression;
+import org.apache.ws.notification.topics.TopicExpressionEngine;
+import org.apache.ws.notification.topics.TopicExpressionEvaluator;
+import org.apache.ws.notification.topics.TopicListener;
+import org.apache.ws.notification.topics.topicexpression.impl.TopicExpressionException;
+import org.apache.ws.pubsub.NotificationConsumer;
+import org.apache.ws.pubsub.emitter.EmitterTask;
+import org.apache.ws.util.XmlBeanUtils;
+import org.apache.ws.util.JaxpUtils;
+import org.apache.ws.util.thread.NamedThread;
+import org.apache.xmlbeans.XmlCursor;
+import org.apache.xmlbeans.XmlObject;
+import org.apache.xmlbeans.XmlOptions;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotificationMessageHolderType;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotifyDocument;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.DOMException;
+import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
+import org.xmlsoap.schemas.ws.x2003.x03.addressing.ReferencePropertiesType;
+import org.xml.sax.SAXException;
+
+import javax.xml.soap.MessageFactory;
+import javax.xml.soap.SOAPBody;
+import javax.xml.soap.SOAPElement;
+import javax.xml.soap.SOAPEnvelope;
+import javax.xml.soap.SOAPFactory;
+import javax.xml.soap.SOAPHeader;
+import javax.xml.soap.SOAPHeaderElement;
+import javax.xml.soap.SOAPMessage;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.Serializable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Topic listener implementation that will trigger notifications when a topic
+ * changes. To be used in conjunction with the SimpleSubscription class.
+ *
+ * @see SimpleSubscription
+ */
+public class SimpleSubscriptionTopicListener
+ implements TopicListener,
+ NotificationConsumer,
+ Serializable
+{
+ private static Log logger = LogFactory.getLog( SimpleSubscriptionTopicListener.class.getName( ) );
+
+ // the thread pool used to emit notifications
+ private static final PooledExecutor EMITTER_POOL;
+
+ static
+ {
+ EMITTER_POOL = new PooledExecutor( 100 );
+
+ // make sure the threads are non-daemon threads so they have time to complete even if the JVM wants to shut down
+ EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( "notifmgr-emitter", false ) );
+ }
+
+ private Subscription m_subscription;
+ private EndpointReference m_epr;
+
+ /**
+ * Construct a listener instance.
+ *
+ * @param Subscription the subscription which is being wrapped in a listener
+ */
+ public SimpleSubscriptionTopicListener( Subscription subscription )
+ {
+ this.m_subscription = subscription;
+ subscription.setNotificationConsumer( this );
+ m_epr = subscription.getConsumerReference( );
+ }
+
+ /**
+ * DOCUMENT_ME
+ *
+ * @return DOCUMENT_ME
+ */
+ public EndpointReference getEPR( )
+ {
+ return m_epr;
+ }
+
+ /**
+ * what's this for??
+ *
+ * @return
+ */
+ public int getMode( )
+ {
+ return 0;
+ }
+
+ /**
+ * @return Subscription
+ */
+ public Subscription getSubscription( )
+ {
+ return this.m_subscription;
+ }
+
+ /**
+ * DOCUMENT_ME
+ *
+ * @param subscription DOCUMENT_ME
+ * @param status DOCUMENT_ME
+ * @param reason DOCUMENT_ME
+ */
+ public void end( org.apache.ws.pubsub.Subscription subscription,
+ URI status,
+ String reason )
+ {
+ }
+
+ /**
+ * DOCUMENT_ME
+ *
+ * @param subscription DOCUMENT_ME
+ * @param message DOCUMENT_ME
+ */
+ public void notify( org.apache.ws.pubsub.Subscription subscription,
+ Object message )
+ {
+ try
+ {
+ notify( (Subscription) subscription, message );
+ }
+ catch ( Exception e )
+ {
+ throw new RuntimeException( e );
+ }
+ }
+
+ /**
+ * Send a notification
+ *
+ * @param subscription The subscription for which to send the notification
+ * @param msg The new value of the topic
+ * @throws Exception
+ */
+ public void notify( Subscription subscription,
+ Object msg )
+ throws Exception
+ {
+ synchronized ( subscription )
+ {
+ if ( !subscription.isPaused( ) )
+ {
+ System.out.println( "Notification received for subscription with Id " + subscription.getID( )
+ + "; value: " + msg );
+
+ EndpointReference epr =
+ subscription.getNotificationConsumer( ).getEPR( );
+ NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance( );
+ NotifyDocument.Notify notify = notifyDoc.addNewNotify( );
+ NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage( );
+
+ //assumes xmlobject for msg...this needs to change
+ notificationMessageHolderType.setMessage( (XmlObject) msg );
+
+ //set the producer ref
+ notificationMessageHolderType.setProducerReference( (EndpointReferenceType) ( (XmlObjectWrapper) subscription.getNotificationProducer( )
+ .getEPR( ) )
+ .getXmlObject( ) );
+
+ TopicExpression topicExpressionIntf = subscription.getTopicExpression( );
+ TopicExpressionType tp =
+ (TopicExpressionType) ( (XmlObjectWrapper) topicExpressionIntf ).getXmlObject( );
+ notificationMessageHolderType.setTopic( tp );
+ Document document = toDocument(subscription, notifyDoc);
+
+ SOAPMessage soapMessage =
+ buildSOAPMessage( document, (EndpointReferenceType) ( (XmlObjectWrapper) epr ).getXmlObject( ) );
+
+ EMITTER_POOL.execute( EmitterTask.createEmitterTask( soapMessage,
+ new URL( epr.getAddress( ).toString( ) ) ) );
+ }
+ }
+ }
+
+ private Document toDocument(Subscription subscription, NotifyDocument notifyDoc)
+ throws ParserConfigurationException, SAXException, IOException
+ {
+ Document document = null;
+ //notify is now populated
+ if (subscription.getUseNotify( ))
+ {
+ document = (Document) notifyDoc.newDomNode();
+ }
+ else
+ {
+ NotificationMessageHolderType notificationMessageArray = notifyDoc.getNotify().getNotificationMessageArray(0);
+ String s = notificationMessageArray.xmlText(new XmlOptions().setSaveOuter());
+ document = JaxpUtils.toDocument( s );
+ }
+ return document;
+ }
+
+ /**
+ * DOCUMENT_ME
+ *
+ * @param topic DOCUMENT_ME
+ */
+ public void topicAdded( Topic topic )
+ {
+ //todo
+ }
+
+ /**
+ * DOCUMENT_ME
+ *
+ * @param topic DOCUMENT_ME
+ */
+ public void topicChanged( Topic topic )
+ {
+ Subscription subscription = m_subscription;
+
+ if ( subscription != null )
+ {
+ try
+ {
+ this.notify( subscription,
+ topic.getCurrentMessage( ) );
+ }
+ catch ( Exception e )
+ {
+ logger.warn( "notificationFailed" + subscription );
+ logger.debug( "", e );
+ }
+ }
+ }
+
+ /**
+ * DOCUMENT_ME
+ *
+ * @param topic DOCUMENT_ME
+ */
+ public void topicRemoved( Topic topic )
+ {
+ //todo
+ }
+
+ /*
+ * Add WS-Addressing headers to a notification.
+ *
+ * @param header - The header to which to add the WS-Addressing headers.
+ * @param consumerEPR - An endpointReferece to the consumer of this notification.
+ * @throws Exception
+ */
+ private void addWSAHeaders( SOAPHeader header,
+ EndpointReferenceType consumerEPR )
+ throws Exception
+ {
+ SOAPFactory factory = SOAPFactory.newInstance( );
+
+ SOAPHeaderElement h =
+ header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.TO,
+ Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
+ Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
+
+ h.addTextNode( consumerEPR.getAddress( ).getStringValue( ) );
+
+ h = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.ACTION,
+ Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
+ Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
+ h.addTextNode( BaseNotification1_2Constants.NOTIFY_ACTION_URL );
+
+ ReferencePropertiesType props = consumerEPR.getReferenceProperties( );
+ if ( props != null )
+ {
+ XmlCursor cursor = props.newCursor( );
+
+ boolean haveChild = cursor.toFirstChild( );
+
+ while ( haveChild )
+ {
+ SOAPElement e = XmlBeanUtils.toSOAPElement( cursor.getObject( ) );
+ h = header.addHeaderElement( e.getElementName( ) );
+ h.addTextNode( e.getValue( ) );
+
+ haveChild = cursor.toNextSibling( );
+ }
+
+ cursor.dispose( );
+ }
+ }
+
+ private SOAPMessage buildSOAPMessage( Document fullMsgBodyElem,
+ EndpointReferenceType consumerEPR )
+ throws Exception
+ {
+ SOAPMessage msg = MessageFactory.newInstance( ).createMessage( );
+ SOAPEnvelope envelope = msg.getSOAPPart( ).getEnvelope( );
+ SOAPBody body = envelope.getBody( );
+ body.addDocument(fullMsgBodyElem);
+ SOAPHeader header = msg.getSOAPHeader( );
+ addWSAHeaders( header, consumerEPR );
+
+ return msg;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: hermes-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: hermes-dev-help@ws.apache.org