You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pubscribe-dev@ws.apache.org by sc...@apache.org on 2005/02/03 21:32:56 UTC

svn commit: r151222 - in incubator/hermes/trunk/src: java/org/apache/ws/notification/base/ java/org/apache/ws/notification/base/impl/ java/org/apache/ws/notification/base/v1_2/porttype/impl/ java/org/apache/ws/notification/topics/impl/ test/org/apache/

Author: scamp
Date: Thu Feb  3 12:32:53 2005
New Revision: 151222

URL: http://svn.apache.org/viewcvs?view=rev&rev=151222
Log:
Added the building of the NotificationMessage

Added:
    incubator/hermes/trunk/src/test/org/apache/
Modified:
    incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java?view=diff&r1=151221&r2=151222
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java Thu Feb  3 12:32:53 2005
@@ -19,7 +19,11 @@
 import org.apache.ws.resource.properties.query.QueryExpression;
 import org.apache.ws.resource.lifetime.ScheduledResourceTerminationResource;
 import org.apache.ws.notification.topics.TopicExpression;
+import org.apache.ws.notification.topics.impl.SimpleSubscriptionTopicListener;
+import org.apache.ws.notification.base.v1_2.porttype.impl.NotificationProducerPortTypeImpl;
 import org.apache.ws.addressing.EndpointReference;
+import org.apache.ws.pubsub.NotificationConsumer;
+import org.apache.ws.pubsub.NotificationProducer;
 
 /**
  * Interface to be implemented by subscription resources. Exposes the information
@@ -67,4 +71,10 @@
     void setEpr(EndpointReference epr);
 
     EndpointReference getEpr();
+
+    void setNotificationConsumer(NotificationConsumer notificationConsumer);
+
+    void setNotificationProducer(NotificationProducer notificationProducer);
+
+    EndpointReference getConsumerReference();
 }

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java?view=diff&r1=151221&r2=151222
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java Thu Feb  3 12:32:53 2005
@@ -75,6 +75,8 @@
 
     private static final UuidGenerator UUID_GEN =
             UuidGeneratorFactory.createUUIdGenerator();
+    private NotificationConsumer m_notificationConsumer;
+    private NotificationProducer m_notificationProducer;
 
     public ResourcePropertySet getResourcePropertySet()
     {
@@ -311,12 +313,12 @@
 
     public NotificationConsumer getNotificationConsumer()
     {
-        return null;
+        return m_notificationConsumer;
     }
 
     public NotificationProducer getNotificationProducer()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return m_notificationProducer;
     }
 
     public EndpointReference getEpr()
@@ -328,4 +330,15 @@
     {
         m_epr = epr;
     }
+
+    public void setNotificationConsumer(NotificationConsumer notificationConsumer)
+    {
+        m_notificationConsumer = notificationConsumer;
+    }
+
+    public void setNotificationProducer(NotificationProducer notificationProducer)
+    {
+        m_notificationProducer = notificationProducer;
+    }
+    
 }

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java?view=diff&r1=151221&r2=151222
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v1_2/porttype/impl/NotificationProducerPortTypeImpl.java Thu Feb  3 12:32:53 2005
@@ -36,6 +36,10 @@
 import org.apache.ws.addressing.XmlBeansEndpointReference;
 import org.apache.ws.addressing.EndpointReference;
 import org.apache.ws.XmlObjectWrapper;
+import org.apache.ws.pubsub.NotificationProducer;
+import org.apache.ws.pubsub.Filter;
+import org.apache.ws.pubsub.NotificationConsumer;
+import org.apache.ws.pubsub.DeliveryMode;
 import org.apache.xmlbeans.XmlObject;
 import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.GetCurrentMessageDocument;
 import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.GetCurrentMessageResponseDocument;
@@ -61,7 +65,7 @@
  * @author Ian Springer (ian DOT springer AT hp DOT com)
  */
 public class NotificationProducerPortTypeImpl extends AbstractResourcePropertiesPortType
-        implements NotificationProducerPortType
+        implements NotificationProducerPortType, NotificationProducer
 {
 
     public NotificationProducerPortTypeImpl( ResourceContext resourceContext )
@@ -112,15 +116,16 @@
             SubscriptionHome subscriptionHome = (SubscriptionHome) initialContext.lookup( SubscriptionHome.HOME_LOCATION );
           
             Subscription subscription = subscriptionHome.create(Subscription1_2Resource.class, new XmlBeansEndpointReference(consumerEPR), producerEPR, initialTerminationTime,subPolicy, precondition, selector,getResourceKey(), ((AbstractResourceContext)getResourceContext()).getResourceHomeLocation(),topicExpr,useNotify );
+            subscription.setNotificationProducer(this);
             epr = (EndpointReferenceType) ((XmlObjectWrapper)subscription.getEpr()).getXmlObject();
 
             Collection collection = evaluateTopicExpression(topicExpr);
-
+            SimpleSubscriptionTopicListener simpleSubscriptionTopicListener = new SimpleSubscriptionTopicListener(subscription);
             //add listeners
             for (Iterator iterator = collection.iterator(); iterator.hasNext();)
             {
                 Topic topic = (Topic) iterator.next();
-                topic.addTopicListener(new SimpleSubscriptionTopicListener(subscription));
+                topic.addTopicListener(simpleSubscriptionTopicListener);
             }
         }
         catch (Exception e)
@@ -205,5 +210,41 @@
                 //get last message from topic
         // TODO
         return message;
+    }
+
+    /**
+     * Returns this producer's endpoint reference.
+     *
+     * @return this producer's endpoint reference
+     */
+    public EndpointReference getEPR()
+    {
+        return buildEPR(getResourceContext());
+    }
+
+    /**
+     * Subscribe to notifications from this producer.
+     *
+     * @param notificationConsumer
+     * @param filters
+     * @param initialTerminationTime
+     * @param deliveryMode           the notification delivery mode, or null to use default mode
+     * @param policy                 a policy to be associated with the subscription, or null if no policy should be used
+     * @return the subscription
+     */
+    public org.apache.ws.pubsub.Subscription subscribe(NotificationConsumer notificationConsumer, Filter filters[], Calendar initialTerminationTime, DeliveryMode deliveryMode, Object policy)
+    {
+        return null;
+    }
+
+    /**
+     * Returns the last notification message published for the given set of filters.
+     *
+     * @param filters
+     * @return
+     */
+    public Object getCurrentMessage(Filter filters[])
+    {
+        return null;
     }
 }

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java?view=diff&r1=151221&r2=151222
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java Thu Feb  3 12:32:53 2005
@@ -1,262 +1,332 @@
-/*
- *  Copyright 2004 The Apache Software Foundation
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.ws.notification.topics.impl;
-
-
-import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ws.XmlObjectWrapper;
-import org.apache.ws.addressing.Addressing_03_2003_Constants;
-import org.apache.ws.addressing.EndpointReference;
-import org.apache.ws.notification.base.Subscription;
-import org.apache.ws.notification.base.v1_2.BaseNotification1_2Constants;
-import org.apache.ws.notification.topics.Topic;
-import org.apache.ws.notification.topics.TopicExpression;
-import org.apache.ws.notification.topics.TopicExpressionEngine;
-import org.apache.ws.notification.topics.TopicExpressionEvaluator;
-import org.apache.ws.notification.topics.TopicListener;
-import org.apache.ws.notification.topics.topicexpression.impl.TopicExpressionException;
-import org.apache.ws.pubsub.NotificationConsumer;
-import org.apache.ws.pubsub.emitter.EmitterTask;
-import org.apache.ws.util.XmlBeanUtils;
-import org.apache.ws.util.thread.NamedThread;
-import org.apache.xmlbeans.XmlCursor;
-import org.apache.xmlbeans.XmlObject;
-import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotificationMessageHolderType;
-import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotifyDocument;
-import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
-import org.w3c.dom.Document;
-import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
-import org.xmlsoap.schemas.ws.x2003.x03.addressing.ReferencePropertiesType;
-
-import javax.xml.soap.MessageFactory;
-import javax.xml.soap.SOAPBody;
-import javax.xml.soap.SOAPElement;
-import javax.xml.soap.SOAPEnvelope;
-import javax.xml.soap.SOAPFactory;
-import javax.xml.soap.SOAPHeader;
-import javax.xml.soap.SOAPHeaderElement;
-import javax.xml.soap.SOAPMessage;
-import java.io.Serializable;
-import java.net.URL;
-import java.util.List;
-
-/**
- * Topic listener implementation that will trigger notifications when a topic
- * changes. To be used in conjunction with the SimpleSubscription class.
- *
- * @see SimpleSubscription
- */
-public class SimpleSubscriptionTopicListener implements TopicListener,
-                                                        Serializable
-{
-
-    private static Log logger =
-        LogFactory.getLog(SimpleSubscriptionTopicListener.class.getName());
-
-    private transient NotificationConsumer consumerPort = null;
-
-    private Subscription m_subscription;
-
-     // the thread pool used to emit notifications
-   private static final PooledExecutor EMITTER_POOL;
-
-    static
-   {
-      EMITTER_POOL = new PooledExecutor( 100 );
-
-      // make sure the threads are non-daemon threads so they have time to complete even if the JVM wants to shut down
-      EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( "notifmgr-emitter", false ) );
-   }
-
-    /**
-     * Construct a listener instance.
-     *
-     * @param Subscription the subscription which is being wrapped in a listener
-     */
-    public SimpleSubscriptionTopicListener(
-        Subscription subscription)
-    {
-        this.m_subscription = subscription;
-    }
-
-    public void topicChanged(Topic topic)
-    {
-        Subscription subscription = m_subscription;
-
-        if (subscription != null) {
-            try
-            {
-                this.notify(subscription,
-                            topic.getTopicPath(),
-                            topic.getCurrentMessage());
-            }
-            catch(Exception e)
-            {
-                logger.warn("notificationFailed"+    subscription);
-                logger.debug("", e);
-            }
-        }
-    }
-
-    public void topicAdded(Topic topic)
-    {         //todo
-    }
-
-    public void topicRemoved(Topic topic)
-    {        //todo
-    }
-
-
-
-    /**
-     * @return Subscription
-     */
-    public Subscription getSubscription() {
-        return this.m_subscription;
-    }
-
-
-    /**
-     * Send a notification
-     *
-     * @param subscription The subscription for which to send the notification
-     * @param topicPath    The topic path of the topic that caused the
-     *                     notification
-     * @param msg     The new value of the topic
-     * @throws Exception
-     */
-    protected void notify(
-        Subscription subscription, List topicPath, Object msg)
-        throws Exception
-    {
-
-        synchronized(subscription)
-        {
-            if(!subscription.isPaused())
-            {
-                System.out.println("Notification received for subscription with Id " + subscription.getID() + "; value: " + msg);
-
-                EndpointReference epr = subscription.getNotificationConsumer().getEPR();
-               
-                if(subscription.getUseNotify())
-                {
-                    NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance();
-                    NotifyDocument.Notify notify = notifyDoc.addNewNotify();
-                    NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage();
-
-                    //assumes xmlobject for msg...this needs to change
-                    notificationMessageHolderType.setMessage((XmlObject) msg);
-
-                    //set the producer ref
-                    notificationMessageHolderType.setProducerReference((EndpointReferenceType) ((XmlObjectWrapper)subscription.getNotificationProducer().getEPR()).getXmlObject());
-
-                    TopicExpressionEngine engine =
-                        TopicExpressionEngineImpl.getInstance();
-                    TopicExpression topicExpressionIntf = subscription.getTopicExpression();
-
-                    TopicExpressionType tp = (TopicExpressionType) ((XmlObjectWrapper)topicExpressionIntf).getXmlObject();
-
-                    String dialect = topicExpressionIntf.getDialect().toString();
-
-                    TopicExpressionEvaluator evaluator =
-                        engine.getEvaluator(dialect);
-
-                    //TopicExpression
-                    TopicExpression topicExpression = null;
-                    try
-                    {
-                        topicExpression = (TopicExpression) evaluator.toTopicExpression(
-                                                    topicPath);
-                    }
-                    catch (TopicExpressionException e)
-                    {
-                         //todo do something
-                    }
-
-                    notificationMessageHolderType.setTopic((TopicExpressionType) ((XmlObjectWrapper)topicExpression).getXmlObject());
-                    //notify is now populated
-
-                    //build soap mesage
-                    SOAPMessage soapMessage = buildSOAPMessage(notifyDoc, (EndpointReferenceType)((XmlObjectWrapper)epr).getXmlObject());
-
-                    EMITTER_POOL.execute(EmitterTask.createEmitterTask(soapMessage, new URL(epr.getAddress().toString())));
-
-                }
-                else
-                {
-                    // TODO: raw notifications ///could we build the same as above and simply strip off the notify?? not sure
-                }
-            }
-        }
-    }
-   private SOAPMessage buildSOAPMessage( XmlObject fullMsgBodyElem,
-                                         EndpointReferenceType consumerEPR )
-   throws Exception
-   {
-      SOAPMessage  msg      = MessageFactory.newInstance(  ).createMessage(  );
-      SOAPEnvelope envelope = msg.getSOAPPart(  ).getEnvelope(  );
-      SOAPBody     body     = envelope.getBody(  );
-      body.addDocument( (Document)fullMsgBodyElem.newDomNode() );
-      SOAPHeader header = msg.getSOAPHeader(  );
-      addWSAHeaders( header, consumerEPR );
-
-      return msg;
-   }
-
-    /*
-     * Add WS-Addressing headers to a notification.
-     *
-     * @param header - The header to which to add the WS-Addressing headers.
-     * @param consumerEPR - An endpointReferece to the consumer of this notification.
-     * @throws Exception
-     */
-    private void addWSAHeaders( SOAPHeader            header,
-                                EndpointReferenceType consumerEPR )
-    throws Exception
-    {
-       SOAPFactory       factory = SOAPFactory.newInstance(  );
-
-       SOAPHeaderElement h = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.TO,
-                                                                          Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
-                                                                          Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
-
-       h.addTextNode( consumerEPR.getAddress(  ).getStringValue(  ) );
-
-       h = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.ACTION ,
-                                                        Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA
-                                                      , Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
-       h.addTextNode( BaseNotification1_2Constants.NOTIFY_ACTION_URL );
-
-       ReferencePropertiesType props  = consumerEPR.getReferenceProperties(  );
-       XmlCursor               cursor = props.newCursor(  );
-
-       boolean                 haveChild = cursor.toFirstChild(  );
-
-       while ( haveChild )
-       {
-          SOAPElement e =  XmlBeanUtils.toSOAPElement( cursor.getObject(  ) );
-          h = header.addHeaderElement( e.getElementName(  ) );
-          h.addTextNode( e.getValue(  ) );
-
-          haveChild = cursor.toNextSibling(  );
-       }
-
-       cursor.dispose(  );
-    }
-
-}
+/*=============================================================================*
+ *  Copyright 2004 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *=============================================================================*/
+package org.apache.ws.notification.topics.impl;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ws.XmlObjectWrapper;
+import org.apache.ws.addressing.Addressing_03_2003_Constants;
+import org.apache.ws.addressing.EndpointReference;
+import org.apache.ws.notification.base.Subscription;
+import org.apache.ws.notification.base.v1_2.BaseNotification1_2Constants;
+import org.apache.ws.notification.topics.Topic;
+import org.apache.ws.notification.topics.TopicExpression;
+import org.apache.ws.notification.topics.TopicExpressionEngine;
+import org.apache.ws.notification.topics.TopicExpressionEvaluator;
+import org.apache.ws.notification.topics.TopicListener;
+import org.apache.ws.notification.topics.topicexpression.impl.TopicExpressionException;
+import org.apache.ws.pubsub.NotificationConsumer;
+import org.apache.ws.pubsub.emitter.EmitterTask;
+import org.apache.ws.util.XmlBeanUtils;
+import org.apache.ws.util.JaxpUtils;
+import org.apache.ws.util.thread.NamedThread;
+import org.apache.xmlbeans.XmlCursor;
+import org.apache.xmlbeans.XmlObject;
+import org.apache.xmlbeans.XmlOptions;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotificationMessageHolderType;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotifyDocument;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.DOMException;
+import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
+import org.xmlsoap.schemas.ws.x2003.x03.addressing.ReferencePropertiesType;
+import org.xml.sax.SAXException;
+
+import javax.xml.soap.MessageFactory;
+import javax.xml.soap.SOAPBody;
+import javax.xml.soap.SOAPElement;
+import javax.xml.soap.SOAPEnvelope;
+import javax.xml.soap.SOAPFactory;
+import javax.xml.soap.SOAPHeader;
+import javax.xml.soap.SOAPHeaderElement;
+import javax.xml.soap.SOAPMessage;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.Serializable;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.List;
+
+/**
+ * Topic listener implementation that will trigger notifications when a topic
+ * changes. To be used in conjunction with the SimpleSubscription class.
+ *
+ * @see SimpleSubscription
+ */
+public class SimpleSubscriptionTopicListener
+   implements TopicListener,
+              NotificationConsumer,
+              Serializable
+{
+   private static Log logger = LogFactory.getLog( SimpleSubscriptionTopicListener.class.getName(  ) );
+
+   // the thread pool used to emit notifications
+   private static final PooledExecutor EMITTER_POOL;
+
+   static
+   {
+      EMITTER_POOL = new PooledExecutor( 100 );
+
+      // make sure the threads are non-daemon threads so they have time to complete even if the JVM wants to shut down
+      EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( "notifmgr-emitter", false ) );
+   }
+
+   private Subscription m_subscription;
+   private EndpointReference m_epr;
+
+   /**
+    * Construct a listener instance.
+    *
+    * @param Subscription the subscription which is being wrapped in a listener
+    */
+   public SimpleSubscriptionTopicListener( Subscription subscription )
+   {
+      this.m_subscription = subscription;
+      subscription.setNotificationConsumer( this );
+      m_epr = subscription.getConsumerReference(  );
+   }
+
+   /**
+    * DOCUMENT_ME
+    *
+    * @return DOCUMENT_ME
+    */
+   public EndpointReference getEPR(  )
+   {
+      return m_epr;
+   }
+
+   /**
+    * what's this for??
+    *
+    * @return
+    */
+   public int getMode(  )
+   {
+      return 0;
+   }
+
+   /**
+    * @return Subscription
+    */
+   public Subscription getSubscription(  )
+   {
+      return this.m_subscription;
+   }
+
+   /**
+    * DOCUMENT_ME
+    *
+    * @param subscription DOCUMENT_ME
+    * @param status DOCUMENT_ME
+    * @param reason DOCUMENT_ME
+    */
+   public void end( org.apache.ws.pubsub.Subscription subscription,
+                    URI                               status,
+                    String                            reason )
+   {
+   }
+
+   /**
+    * DOCUMENT_ME
+    *
+    * @param subscription DOCUMENT_ME
+    * @param message DOCUMENT_ME
+    */
+   public void notify( org.apache.ws.pubsub.Subscription subscription,
+                       Object                            message )
+   {
+      try
+      {
+         notify( (Subscription) subscription, message );
+      }
+      catch ( Exception e )
+      {
+         throw new RuntimeException( e );
+      }
+   }
+
+   /**
+    * Send a notification
+    *
+    * @param subscription The subscription for which to send the notification
+    * @param msg     The new value of the topic
+    * @throws Exception
+    */
+   public void notify( Subscription subscription,
+                       Object       msg )
+   throws Exception
+   {
+      synchronized ( subscription )
+      {
+         if ( !subscription.isPaused(  ) )
+         {
+            System.out.println( "Notification received for subscription with Id " + subscription.getID(  )
+                                + "; value: " + msg );
+
+            EndpointReference             epr                           =
+               subscription.getNotificationConsumer(  ).getEPR(  );
+            NotifyDocument                notifyDoc                     = NotifyDocument.Factory.newInstance(  );
+            NotifyDocument.Notify         notify                        = notifyDoc.addNewNotify(  );
+            NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage(  );
+
+            //assumes xmlobject for msg...this needs to change
+            notificationMessageHolderType.setMessage( (XmlObject) msg );
+
+            //set the producer ref
+            notificationMessageHolderType.setProducerReference( (EndpointReferenceType) ( (XmlObjectWrapper) subscription.getNotificationProducer(  )
+                                                                                                                         .getEPR(  ) )
+                                                                .getXmlObject(  ) );
+
+            TopicExpression     topicExpressionIntf = subscription.getTopicExpression(  );
+            TopicExpressionType tp =
+               (TopicExpressionType) ( (XmlObjectWrapper) topicExpressionIntf ).getXmlObject(  );
+            notificationMessageHolderType.setTopic( tp );
+            Document document = toDocument(subscription, notifyDoc);
+
+             SOAPMessage soapMessage =
+                  buildSOAPMessage( document, (EndpointReferenceType) ( (XmlObjectWrapper) epr ).getXmlObject(  ) );
+
+               EMITTER_POOL.execute( EmitterTask.createEmitterTask( soapMessage,
+                                                                    new URL( epr.getAddress(  ).toString(  ) ) ) );
+         }
+      }
+   }
+
+    private Document toDocument(Subscription subscription, NotifyDocument notifyDoc)
+            throws ParserConfigurationException, SAXException, IOException
+    {
+        Document document = null;
+        //notify is now populated
+        if (subscription.getUseNotify(  ))
+        {
+            document = (Document) notifyDoc.newDomNode();
+        }
+        else
+        {
+            NotificationMessageHolderType notificationMessageArray = notifyDoc.getNotify().getNotificationMessageArray(0);
+            String s = notificationMessageArray.xmlText(new XmlOptions().setSaveOuter());
+            document = JaxpUtils.toDocument( s );
+        }
+        return document;
+    }
+
+    /**
+    * DOCUMENT_ME
+    *
+    * @param topic DOCUMENT_ME
+    */
+   public void topicAdded( Topic topic )
+   {
+       //todo
+   }
+
+   /**
+    * DOCUMENT_ME
+    *
+    * @param topic DOCUMENT_ME
+    */
+   public void topicChanged( Topic topic )
+   {
+      Subscription subscription = m_subscription;
+
+      if ( subscription != null )
+      {
+         try
+         {
+            this.notify( subscription,
+                         topic.getCurrentMessage(  ) );
+         }
+         catch ( Exception e )
+         {
+            logger.warn( "notificationFailed" + subscription );
+            logger.debug( "", e );
+         }
+      }
+   }
+
+   /**
+    * DOCUMENT_ME
+    *
+    * @param topic DOCUMENT_ME
+    */
+   public void topicRemoved( Topic topic )
+   {
+       //todo
+   }
+
+   /*
+    * Add WS-Addressing headers to a notification.
+    *
+    * @param header - The header to which to add the WS-Addressing headers.
+    * @param consumerEPR - An endpointReferece to the consumer of this notification.
+    * @throws Exception
+    */
+   private void addWSAHeaders( SOAPHeader            header,
+                               EndpointReferenceType consumerEPR )
+   throws Exception
+   {
+      SOAPFactory       factory = SOAPFactory.newInstance(  );
+
+      SOAPHeaderElement h =
+         header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.TO,
+                                                      Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
+                                                      Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
+
+      h.addTextNode( consumerEPR.getAddress(  ).getStringValue(  ) );
+
+      h = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.ACTION,
+                                                       Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
+                                                       Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
+      h.addTextNode( BaseNotification1_2Constants.NOTIFY_ACTION_URL );
+
+      ReferencePropertiesType props = consumerEPR.getReferenceProperties(  );
+      if ( props != null )
+      {
+         XmlCursor cursor = props.newCursor(  );
+
+         boolean   haveChild = cursor.toFirstChild(  );
+
+         while ( haveChild )
+         {
+            SOAPElement e = XmlBeanUtils.toSOAPElement( cursor.getObject(  ) );
+            h = header.addHeaderElement( e.getElementName(  ) );
+            h.addTextNode( e.getValue(  ) );
+
+            haveChild = cursor.toNextSibling(  );
+         }
+
+         cursor.dispose(  );
+      }
+   }
+
+   private SOAPMessage buildSOAPMessage( Document             fullMsgBodyElem,
+                                         EndpointReferenceType consumerEPR )
+   throws Exception
+   {
+      SOAPMessage  msg      = MessageFactory.newInstance(  ).createMessage(  );
+      SOAPEnvelope envelope = msg.getSOAPPart(  ).getEnvelope(  );
+      SOAPBody     body     = envelope.getBody(  );
+      body.addDocument(fullMsgBodyElem);
+      SOAPHeader header = msg.getSOAPHeader(  );
+      addWSAHeaders( header, consumerEPR );
+
+      return msg;
+   }
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: hermes-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: hermes-dev-help@ws.apache.org