You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pubscribe-dev@ws.apache.org by sc...@apache.org on 2005/02/02 19:17:09 UTC
svn commit: r149542 - in
incubator/hermes/trunk/src/java/org/apache/ws/notification:
base/impl/AbstractSubscription.java
topics/impl/SimpleSubscriptionTopicListener.java
Author: scamp
Date: Wed Feb 2 10:17:07 2005
New Revision: 149542
URL: http://svn.apache.org/viewcvs?view=rev&rev=149542
Log:
updated...for emitter
Modified:
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java?view=diff&r1=149541&r2=149542
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java Wed Feb 2 10:17:07 2005
@@ -311,7 +311,7 @@
public NotificationConsumer getNotificationConsumer()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public NotificationProducer getNotificationProducer()
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java?view=diff&r1=149541&r2=149542
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java Wed Feb 2 10:17:07 2005
@@ -21,11 +21,32 @@
import org.apache.ws.notification.base.Subscription;
import org.apache.ws.notification.topics.Topic;
import org.apache.ws.notification.topics.TopicListener;
+import org.apache.ws.notification.topics.TopicExpressionEngine;
+import org.apache.ws.notification.topics.TopicExpression;
+import org.apache.ws.notification.topics.TopicExpressionEvaluator;
+import org.apache.ws.notification.topics.topicexpression.impl.TopicExpressionException;
import org.apache.ws.pubsub.NotificationConsumer;
+import org.apache.ws.pubsub.emitter.EmitterTask;
+import org.apache.ws.util.thread.NamedThread;
+import org.apache.ws.XmlObjectWrapper;
+import org.apache.axis.message.addressing.EndpointReference;
+import org.apache.xmlbeans.XmlObject;
import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotifyDocument;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.NotificationMessageHolderType;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
+import org.w3c.dom.Document;
import java.io.Serializable;
import java.util.List;
+import java.net.URL;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+import javax.xml.soap.SOAPMessage;
+import javax.xml.soap.MessageFactory;
+import javax.xml.soap.SOAPHeader;
+import javax.xml.soap.SOAPBody;
/**
* Topic listener implementation that will trigger notifications when a topic
@@ -44,6 +65,17 @@
private Subscription m_subscription;
+ // the thread pool used to emit notifications
+ private static final PooledExecutor EMITTER_POOL;
+
+ static
+ {
+ EMITTER_POOL = new PooledExecutor( 100 );
+
+ // make sure the threads are non-daemon threads so they have time to complete even if the JVM wants to shut down
+ EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( "notifmgr-emitter", false ) );
+ }
+
/**
* Construct a listener instance.
*
@@ -112,20 +144,27 @@
{
System.out.println("Notification received for subscription with Id " + subscription.getID() + "; value: " + msg);
- /* EndpointReferenceType consumerEPR =
- subscription.getNotificationConsumer().getEPR();
+ EndpointReference epr = subscription.getNotificationConsumer().getEPR();
+
if(subscription.getUseNotify())
{
- setPort(consumerEPR);
- NotifyDocument.Notify notification =NotifyDocument.Notify.Factory.newInstance();
- EndpointReferenceType producerEndpoint =
- subscription.getNotificationProducer().getEPR();
+ NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance();
+ NotifyDocument.Notify notify = notifyDoc.addNewNotify();
+ NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage();
+
+ //assumes xmlobject for msg...this needs to change
+ notificationMessageHolderType.setMessage((XmlObject) msg);
+
+ EndpointReferenceType endpointReferenceType = notificationMessageHolderType.addNewProducerReference();
+ //todo once epr is resolved
+ //notificationMessageHolderType.setProducerReference(subscription.getNotificationProducer());
+
TopicExpressionEngine engine =
TopicExpressionEngineImpl.getInstance();
TopicExpression topicExpressionIntf = subscription.getTopicExpression();
TopicExpressionType tp = TopicExpressionType.Factory.newInstance();
- //todo
+ //todo ???
String dialect = topicExpressionIntf.getDialect().toString();
@@ -143,55 +182,29 @@
}
- NotificationMessageHolderType[] message =
- {NotificationMessageHolderType.Factory.newInstance()};
- message[0].setProducerReference(producerEndpoint);
- message[0].setMessage(newValue);
- message[0].setTopic(topicExpression);
- notification.setNotificationMessage(message);
- this.consumerPort.notify(notification);
+ notificationMessageHolderType.setTopic((TopicExpressionType) ((XmlObjectWrapper)topicExpression).getXmlObject());
+ //notify is done
+
+ //build soap mesage
+ SOAPMessage soapMsg = MessageFactory.newInstance().createMessage();
+
+ SOAPHeader soapHeader = soapMsg.getSOAPHeader();
+ //set the wsa headers
+
+ SOAPBody soapBody = soapMsg.getSOAPBody();
+ //doubt this will work...but lets give it a try using a Document type "notifyDoc"
+ soapBody.addDocument((Document) notifyDoc.newDomNode());
+
+ EMITTER_POOL.execute(EmitterTask.createEmitterTask(soapMsg, new URL(epr.getAddress().toString())));
}
else
{
- // TODO: raw notifications
- } */
+ // TODO: raw notifications ///could we build the same as above and simply strip off the notify?? not sure
+ }
}
}
}
- //TODO: revisit this from perf angle, don't thing we need to regenerate
- // the stub quite as aggressively
- private void setPort(
-
- EndpointReferenceType consumerEPR)
- throws Exception
- {
- setPort(true, consumerEPR);
- }
-
- private synchronized void setPort(
- boolean reuse,
- EndpointReferenceType consumerEPR)
- throws Exception
- {
-
- logger.debug("set port with " + reuse);
- if((reuse) && (this.consumerPort != null))
- {
- return;
- }
- //todo decide what to do here...
- /* if(this.locator == null)
- {
- this.locator = new WSBaseNotificationServiceAddressingLocator();
- }
-
- if((this.consumerPort == null) || (!reuse))
- {
- this.consumerPort =
- this.locator.getNotificationConsumerPort(consumerEPR);
- }*/
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: hermes-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: hermes-dev-help@ws.apache.org