You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pubscribe-dev@ws.apache.org by sc...@apache.org on 2005/02/02 19:17:09 UTC

svn commit: r149542 - in incubator/hermes/trunk/src/java/org/apache/ws/notification: base/impl/AbstractSubscription.java topics/impl/SimpleSubscriptionTopicListener.java

Author: scamp
Date: Wed Feb  2 10:17:07 2005
New Revision: 149542

URL: http://svn.apache.org/viewcvs?view=rev&rev=149542
Log:
updated...for emitter

Modified:
    incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java?view=diff&r1=149541&r2=149542
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java Wed Feb  2 10:17:07 2005
@@ -311,7 +311,7 @@
 
     public NotificationConsumer getNotificationConsumer()
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public NotificationProducer getNotificationProducer()

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java?view=diff&r1=149541&r2=149542
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java Wed Feb  2 10:17:07 2005
@@ -21,11 +21,32 @@
 import org.apache.ws.notification.base.Subscription;
 import org.apache.ws.notification.topics.Topic;
 import org.apache.ws.notification.topics.TopicListener;
+import org.apache.ws.notification.topics.TopicExpressionEngine;
+import org.apache.ws.notification.topics.TopicExpression;
+import org.apache.ws.notification.topics.TopicExpressionEvaluator;
+import org.apache.ws.notification.topics.topicexpression.impl.TopicExpressionException;
 import org.apache.ws.pubsub.NotificationConsumer;
+import org.apache.ws.pubsub.emitter.EmitterTask;
+import org.apache.ws.util.thread.NamedThread;
+import org.apache.ws.XmlObjectWrapper;
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.xmlbeans.XmlObject;
 import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotifyDocument;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotificationMessageHolderType;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
+import org.w3c.dom.Document;
 
 import java.io.Serializable;
 import java.util.List;
+import java.net.URL;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+import javax.xml.soap.SOAPMessage;
+import javax.xml.soap.MessageFactory;
+import javax.xml.soap.SOAPHeader;
+import javax.xml.soap.SOAPBody;
 
 /**
  * Topic listener implementation that will trigger notifications when a topic
@@ -44,6 +65,17 @@
 
     private Subscription m_subscription;
 
+     // the thread pool used to emit notifications
+   private static final PooledExecutor EMITTER_POOL;
+
+    static
+   {
+      EMITTER_POOL = new PooledExecutor( 100 );
+
+      // make sure the threads are non-daemon threads so they have time to complete even if the JVM wants to shut down
+      EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( "notifmgr-emitter", false ) );
+   }
+
     /**
      * Construct a listener instance.
      *
@@ -112,20 +144,27 @@
             {
                 System.out.println("Notification received for subscription with Id " + subscription.getID() + "; value: " + msg);
 
-              /*  EndpointReferenceType consumerEPR =
-                    subscription.getNotificationConsumer().getEPR();
+                EndpointReference epr = subscription.getNotificationConsumer().getEPR();
+
                 if(subscription.getUseNotify())
                 {
-                    setPort(consumerEPR);
-                    NotifyDocument.Notify notification =NotifyDocument.Notify.Factory.newInstance();
-                    EndpointReferenceType producerEndpoint =
-                        subscription.getNotificationProducer().getEPR();
+                    NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance();
+                    NotifyDocument.Notify notify = notifyDoc.addNewNotify();
+                    NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage();
+
+                    //assumes xmlobject for msg...this needs to change
+                    notificationMessageHolderType.setMessage((XmlObject) msg);
+
+                    EndpointReferenceType endpointReferenceType = notificationMessageHolderType.addNewProducerReference();
+                    //todo once epr is resolved
+                    //notificationMessageHolderType.setProducerReference(subscription.getNotificationProducer());
+
                     TopicExpressionEngine engine =
                         TopicExpressionEngineImpl.getInstance();
                     TopicExpression topicExpressionIntf = subscription.getTopicExpression();
 
                     TopicExpressionType tp = TopicExpressionType.Factory.newInstance();
-                    //todo
+                    //todo ???
 
 
                     String dialect = topicExpressionIntf.getDialect().toString();
@@ -143,55 +182,29 @@
 
 
                     }
-                    NotificationMessageHolderType[] message =
-                        {NotificationMessageHolderType.Factory.newInstance()};
-                    message[0].setProducerReference(producerEndpoint);
-                    message[0].setMessage(newValue);
-                    message[0].setTopic(topicExpression);
-                    notification.setNotificationMessage(message);
-                    this.consumerPort.notify(notification);
+                    notificationMessageHolderType.setTopic((TopicExpressionType) ((XmlObjectWrapper)topicExpression).getXmlObject());
+                    //notify is done
+
 
+                    //build soap mesage
+                    SOAPMessage soapMsg = MessageFactory.newInstance().createMessage();
+
+                    SOAPHeader soapHeader = soapMsg.getSOAPHeader();
+                    //set the wsa headers
+
+                    SOAPBody soapBody = soapMsg.getSOAPBody();
+                    //doubt this will work...but lets give it a try using a Document type "notifyDoc"
+                    soapBody.addDocument((Document) notifyDoc.newDomNode());
+
+                    EMITTER_POOL.execute(EmitterTask.createEmitterTask(soapMsg, new URL(epr.getAddress().toString())));
 
                 }
                 else
                 {
-                    // TODO: raw notifications
-                }      */
+                    // TODO: raw notifications ///could we build the same as above and simply strip off the notify?? not sure
+                }
             }
         }
     }
 
-    //TODO: revisit this from perf angle, don't thing we need to regenerate
-    // the stub quite as aggressively
-    private void setPort(
-
-        EndpointReferenceType consumerEPR)
-        throws Exception
-    {
-            setPort(true, consumerEPR);
-    }
-
-    private synchronized void setPort(
-        boolean reuse,
-        EndpointReferenceType consumerEPR)
-        throws Exception
-    {
-
-        logger.debug("set port with " + reuse);
-        if((reuse) && (this.consumerPort != null))
-        {
-            return;
-        }
-                         //todo decide what to do here...
-       /* if(this.locator == null)
-        {
-            this.locator = new WSBaseNotificationServiceAddressingLocator();
-        }
-
-        if((this.consumerPort == null) || (!reuse))
-        {
-            this.consumerPort =
-                this.locator.getNotificationConsumerPort(consumerEPR);
-        }*/
-    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: hermes-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: hermes-dev-help@ws.apache.org