You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pubscribe-dev@ws.apache.org by ip...@apache.org on 2005/05/06 01:31:27 UTC
svn commit: r168467 - in /incubator/hermes/trunk: ./
src/java/org/apache/ws/eventing/porttype/impl/
src/java/org/apache/ws/eventing/services/
src/java/org/apache/ws/notification/base/
src/java/org/apache/ws/notification/base/impl/
src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/
src/java/org/apache/ws/notification/topics/impl/
src/java/org/apache/ws/notification/topics/util/
src/java/org/apache/ws/pubsub/
Author: ips
Date: Thu May 5 16:31:26 2005
New Revision: 168467
URL: http://svn.apache.org/viewcvs?rev=168467&view=rev
Log:
added support for subscription precondition and selector; various minor
Modified:
incubator/hermes/trunk/maven.xml
incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java
incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java
incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java
incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java
Modified: incubator/hermes/trunk/maven.xml
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/maven.xml?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/maven.xml (original)
+++ incubator/hermes/trunk/maven.xml Thu May 5 16:31:26 2005
@@ -110,21 +110,20 @@
</postGoal>
<goal name="extract-wsdl-jar">
- <echo>Extracting Wsdls from: ${wsdljar}</echo>
+ <echo>Extracting WSDLs from ${wsdljar} to ${spec.wsdl.dest.dir} ...</echo>
<mkdir dir="${maven.build.dir}/temp-wsdl" />
<mkdir dir="${spec.wsdl.dest.dir}" />
- <unzip src="${wsdljar}"
- dest="${maven.build.dir}/temp-wsdl">
- <patternset>
- <include name="**/*.wsdl"/>
- <include name="**/*.xsd"/>
- </patternset>
- </unzip>
- <copy todir="${spec.wsdl.dest.dir}">
- <fileset dir="${maven.build.dir}/temp-wsdl/schema/src/src/wsdl"/>
- </copy>
-
- <delete dir="${maven.build.dir}/temp-wsdl" />
+ <unzip src="${wsdljar}"
+ dest="${maven.build.dir}/temp-wsdl">
+ <patternset>
+ <include name="**/*.wsdl"/>
+ <include name="**/*.xsd"/>
+ </patternset>
+ </unzip>
+ <copy todir="${spec.wsdl.dest.dir}">
+ <fileset dir="${maven.build.dir}/temp-wsdl/schema" />
+ </copy>
+ <delete dir="${maven.build.dir}/temp-wsdl" />
</goal>
<!-- ================================================================== -->
Modified: incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java Thu May 5 16:31:26 2005
@@ -13,25 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.ws.eventing.porttype.impl;
import org.apache.axis.message.SOAPEnvelope;
-import org.apache.ws.resource.properties.impl.AbstractResourcePropertiesPortType;
-import org.apache.ws.resource.ResourceContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ws.eventing.Subscription;
-
-
import org.apache.ws.eventing.SubscriptionHome;
-
import javax.naming.Context;
import javax.naming.InitialContext;
-
-public class simpleNotificationPortTypeImpl implements org.apache.ws.eventing.porttype.NotificationPortType{
+public class SimpleNotificationPortTypeImpl implements org.apache.ws.eventing.porttype.NotificationPortType{
private static final Log LOG = LogFactory.getLog( NotificationPortTypeImpl.class.getName() );
private boolean m_broker;
Modified: incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java Thu May 5 16:31:26 2005
@@ -53,7 +53,7 @@
}
public void filter(org.apache.axis.message.SOAPEnvelope req, org.apache.axis.message.SOAPEnvelope resp) {
- new simpleNotificationPortTypeImpl().filter(req, resp);
+ new SimpleNotificationPortTypeImpl().filter(req, resp);
}
}
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java Thu May 5 16:31:26 2005
@@ -16,13 +16,14 @@
package org.apache.ws.notification.base;
import org.apache.ws.notification.topics.TopicSpaceSet;
-import org.apache.ws.resource.Resource;
+import org.apache.ws.resource.PropertiesResource;
/**
- * A resource that implements the WS-BaseNotification NotificationProducer portType.
- * Provides access to the set of TopicSpaces supported by the resource.
+ * A resource that implements the WS-BaseNotification NotificationProducer
+ * portType. Provides access to the resource's {@link TopicSpaceSet} and
+ * {@link org.apache.ws.resource.properties.ResourcePropertySet}.
*/
-public interface NotificationProducerResource extends Resource
+public interface NotificationProducerResource extends PropertiesResource
{
TopicSpaceSet getTopicSpaceSet();
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java Thu May 5 16:31:26 2005
@@ -20,7 +20,6 @@
import org.apache.ws.pubsub.NotificationConsumer;
import org.apache.ws.pubsub.NotificationProducer;
import org.apache.ws.resource.PropertiesResource;
-import org.apache.ws.resource.Resource;
import org.apache.ws.resource.lifetime.ScheduledResourceTerminationResource;
import org.apache.ws.resource.properties.query.QueryExpression;
@@ -64,13 +63,14 @@
public void pause() throws Exception;
- public void resume() throws Exception;
+ public void resume() throws Exception;
+
/**
- * Get the producing resource.
+ * Get the producer resource associated with this subscription.
*
- * @return the producing resource
+ * @return the producer resource
*/
- Resource getResource() throws Exception;
+ NotificationProducerResource getProducerResource() throws Exception;
void setEpr(EndpointReference epr);
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java Thu May 5 16:31:26 2005
@@ -42,7 +42,6 @@
import org.apache.ws.util.uuid.UuidGenerator;
import org.apache.ws.util.uuid.UuidGeneratorFactory;
-import javax.naming.Context;
import javax.naming.InitialContext;
import javax.xml.rpc.JAXRPCException;
import java.net.URI;
@@ -132,11 +131,10 @@
return m_producerReference;
}
- public Resource getResource() throws Exception
+ public NotificationProducerResource getProducerResource() throws Exception
{
- Context initialContext = new InitialContext();
- ResourceHome producerHome = (ResourceHome) initialContext.lookup( m_producerHomeLocation );
- return producerHome.find( m_producerKey );
+ ResourceHome producerHome = (ResourceHome) new InitialContext().lookup( m_producerHomeLocation );
+ return (NotificationProducerResource) producerHome.find( m_producerKey );
}
public QueryExpression getSelector()
@@ -228,7 +226,7 @@
Resource producerResource = null;
try
{
- producerResource = getResource();
+ producerResource = getProducerResource();
}
catch ( Exception e )
{
@@ -287,7 +285,7 @@
private Topic[] evaluateTopicExpression() throws Exception
{
- NotificationProducerResource resource = (NotificationProducerResource) getResource();
+ NotificationProducerResource resource = (NotificationProducerResource) getProducerResource();
TopicSpaceSet topicSpaceSet = resource.getTopicSpaceSet();
try
{
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java Thu May 5 16:31:26 2005
@@ -44,7 +44,12 @@
import org.apache.ws.resource.impl.AbstractResourceContext;
import org.apache.ws.resource.properties.NamespaceVersionHolder;
import org.apache.ws.resource.properties.impl.AbstractResourcePropertiesPortType;
+import org.apache.ws.resource.properties.query.QueryEngine;
import org.apache.ws.resource.properties.query.QueryExpression;
+import org.apache.ws.resource.properties.query.UnknownQueryExpressionDialectException;
+import org.apache.ws.resource.properties.query.QueryEvaluationErrorException;
+import org.apache.ws.resource.properties.query.InvalidQueryExpressionException;
+import org.apache.ws.resource.properties.query.impl.QueryEngineImpl;
import org.apache.ws.resource.properties.query.impl.XmlBeansQueryExpression;
import org.apache.ws.util.XmlBeanUtils;
import org.apache.xmlbeans.XmlObject;
@@ -53,6 +58,8 @@
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.SubscribeDocument;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.SubscribeResponseDocument;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
import org.xmlsoap.schemas.ws.x2003.x03.addressing.AttributedURI;
import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceDocument;
import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
@@ -60,6 +67,8 @@
import javax.naming.Context;
import javax.naming.InitialContext;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
import javax.xml.rpc.JAXRPCException;
import java.util.Calendar;
@@ -72,6 +81,23 @@
implements NotificationProducerPortType, NotificationProducer
{
+ private static final QueryEngine QUERY_ENGINE = new QueryEngineImpl();
+ private static final Element TEST_EVAL_CONTEXT; // DOM element used to validate selector and precondition XPaths
+
+ static
+ {
+ Document doc = null;
+ try
+ {
+ doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
+ }
+ catch ( ParserConfigurationException pce )
+ {
+ throw new IllegalStateException( "Failed to create DOM document." );
+ }
+ TEST_EVAL_CONTEXT = doc.createElement( "Foo" );
+ }
+
public NotificationProducerPortTypeImpl( ResourceContext resourceContext )
{
super( resourceContext );
@@ -80,120 +106,90 @@
public SubscribeResponseDocument subscribe( SubscribeDocument requestDoc )
{
SubscribeDocument.Subscribe request = requestDoc.getSubscribe();
-
- //make sure to check if optional elements are set in request...
Calendar initialTerminationTime = null;
- if(request.getInitialTerminationTime() != null)
+ if ( request.isSetInitialTerminationTime() )
{
initialTerminationTime = request.getInitialTerminationTime();
}
-
- QueryExpression precondition = null;
- if(request.getPrecondition() != null)
- {
- precondition = new XmlBeansQueryExpression( request.getPrecondition() );
- }
-
- QueryExpression selector = null;
- if(request.getSelector() != null)
- {
- selector = new XmlBeansQueryExpression( request.getSelector() );
- }
-
XmlObject subPolicy = null;
- if(request.getSubscriptionPolicy() != null)
+ if ( request.isSetSubscriptionPolicy() )
{
subPolicy = request.getSubscriptionPolicy();
}
-
EndpointReferenceType epr = null;
try
{
+ QueryExpression precondition = null;
+ if ( request.isSetPrecondition() )
+ {
+ precondition = new XmlBeansQueryExpression( request.getPrecondition() );
+ validateQueryExpressionEvaluatesToBoolean( precondition );
+ }
+ QueryExpression selector = null;
+ if ( request.isSetSelector() )
+ {
+ selector = new XmlBeansQueryExpression( request.getSelector() );
+ validateQueryExpressionEvaluatesToBoolean( selector );
+ }
TopicExpression topicExpr = new XmlBeansTopicExpression( request.getTopicExpression() );
- //this prop has default values in schema..it "should" get a value either way if not explicitly set in request
+ // this prop has default values in schema..it "should" get a value either way if not explicitly set in request
boolean useNotify = request.isSetUseNotify() ? request.getUseNotify() : true;
EndpointReference producerEPR = buildEPR( getResourceContext() );
- Context initialContext = new InitialContext( );
- SubscriptionHome subscriptionHome = (SubscriptionHome) initialContext.lookup( SubscriptionHome.HOME_LOCATION );
-
- Subscription subscription = subscriptionHome.create(SubscriptionResource.class, new XmlBeansEndpointReference(request.getConsumerReference()), producerEPR, initialTerminationTime,subPolicy, precondition, selector,getResourceKey(), ((AbstractResourceContext)getResourceContext()).getResourceHomeLocation(),topicExpr,useNotify );
- subscription.setNotificationProducer(this);
- epr = (EndpointReferenceType) ((XmlObjectWrapper)subscription.getEpr()).getXmlObject();
-
- Topic[] topics = evaluateTopicExpression(topicExpr);
- SimpleSubscriptionTopicListener simpleSubscriptionTopicListener = new SimpleSubscriptionTopicListener(subscription);
+ Context initialContext = new InitialContext();
+ SubscriptionHome subscriptionHome = (SubscriptionHome) initialContext.lookup(
+ SubscriptionHome.HOME_LOCATION );
+
+ Subscription subscription = subscriptionHome.create( SubscriptionResource.class,
+ new XmlBeansEndpointReference( request.getConsumerReference() ), producerEPR, initialTerminationTime,
+ subPolicy, precondition, selector, getResourceKey(),
+ ( (AbstractResourceContext) getResourceContext() ).getResourceHomeLocation(), topicExpr, useNotify );
+ subscription.setNotificationProducer( this );
+ epr = (EndpointReferenceType) ( (XmlObjectWrapper) subscription.getEpr() ).getXmlObject();
+
+ Topic[] topics = evaluateTopicExpression( topicExpr );
+ SimpleSubscriptionTopicListener simpleSubscriptionTopicListener = new SimpleSubscriptionTopicListener(
+ subscription );
for ( int i = 0; i < topics.length; i++ )
{
- topics[i].addTopicListener(simpleSubscriptionTopicListener);
+ topics[i].addTopicListener( simpleSubscriptionTopicListener );
}
}
- catch (Exception e)
+ catch ( Exception e )
{
- throw new SubscribeCreationFailedFaultException(new WsnNamespaceVersionHolderImpl(),"Subscribe failed, reason: " + e.getLocalizedMessage());
+ throw new SubscribeCreationFailedFaultException( new WsnNamespaceVersionHolderImpl(),
+ "Subscribe failed, reason: " + e.getLocalizedMessage() );
}
-
SubscribeResponseDocument subscribeResponseDoc = SubscribeResponseDocument.Factory.newInstance();
SubscribeResponseDocument.SubscribeResponse subscribeResponse = subscribeResponseDoc.addNewSubscribeResponse();
- subscribeResponse.setSubscriptionReference( epr);
+ subscribeResponse.setSubscriptionReference( epr );
return subscribeResponseDoc;
}
- private Topic[] evaluateTopicExpression(TopicExpression topicExpr)
- {
- NotificationProducerResource resource = (NotificationProducerResource) getResource();
- TopicSpaceSet topicSpaceSet = resource.getTopicSpaceSet();
- try
- {
- return topicSpaceSet.evaluateTopicExpression(topicExpr);
- }
- catch (TopicExpressionException e)
- {
- throw new JAXRPCException("An exception occurred evaluating a TopicExpression. ", e);
- }
- }
-
- private static XmlBeansEndpointReference buildEPR( ResourceContext resourceContext )
- {
- EndpointReferenceDocument eprDoc = EndpointReferenceDocument.Factory.newInstance();
- EndpointReferenceType epr = eprDoc.addNewEndpointReference();
- AttributedURI address = epr.addNewAddress();
- address.setStringValue( resourceContext.getServiceURL().toString() );
- // TODO: set portType and serviceName! (read from config file)
- ReferencePropertiesType refProps = epr.addNewReferenceProperties();
- try
- {
- XmlBeanUtils.addChildElement( refProps, resourceContext.getResourceKey().getName() );
- }
- catch ( ResourceContextException rce )
- {
- throw new JAXRPCException( rce );
- }
- return new XmlBeansEndpointReference(epr);
- }
-
public GetCurrentMessageResponseDocument getCurrentMessage( GetCurrentMessageDocument requestDoc )
{
GetCurrentMessageDocument.GetCurrentMessage request = requestDoc.getGetCurrentMessage();
TopicExpressionType topicExprXmlBean = request.getTopic();
TopicExpression topicExpr = new XmlBeansTopicExpression( topicExprXmlBean );
- Topic[] topics = evaluateTopicExpression(topicExpr);
+ Topic[] topics = evaluateTopicExpression( topicExpr );
- if(topics.length == 0)
+ if ( topics.length == 0 )
{
- throw new NoCurrentMessageOnTopicFaultException(new WsnNamespaceVersionHolderImpl(),"There was no current message to retrieve.");
+ throw new NoCurrentMessageOnTopicFaultException( new WsnNamespaceVersionHolderImpl(),
+ "There was no current message to retrieve." );
}
GetCurrentMessageResponseDocument message = GetCurrentMessageResponseDocument.Factory.newInstance();
- GetCurrentMessageResponseDocument.GetCurrentMessageResponse response = message.addNewGetCurrentMessageResponse();
- if(topics.length > 1)
+ GetCurrentMessageResponseDocument.GetCurrentMessageResponse response = message.addNewGetCurrentMessageResponse();
+ if ( topics.length > 1 )
{
- throw new JAXRPCException("The topicexpression for GetCurrentMessage returned more than 1 value! This is an invalid request.");
+ throw new JAXRPCException(
+ "The topicexpression for GetCurrentMessage returned more than 1 value! This is an invalid request." );
}
else
{
XmlObject currentMessage = (XmlObject) topics[0].getCurrentMessage();
- XmlBeanUtils.addChildElement(response, currentMessage);
+ XmlBeanUtils.addChildElement( response, currentMessage );
}
//todo need to throw appropriate exceptions...
@@ -207,14 +203,59 @@
*/
public EndpointReference getEPR()
{
- return buildEPR(getResourceContext());
+ return buildEPR( getResourceContext() );
+ }
+
+ private void validateQueryExpressionEvaluatesToBoolean( QueryExpression precondition )
+ throws UnknownQueryExpressionDialectException, QueryEvaluationErrorException,
+ InvalidQueryExpressionException
+ {
+ Object queryResult = QUERY_ENGINE.executeQuery( precondition, TEST_EVAL_CONTEXT );
+ if ( !( queryResult instanceof Boolean ) )
+ {
+ throw new IllegalStateException( "Invalid precondition - expression does not return a Boolean." );
+ }
+ }
+
+ private Topic[] evaluateTopicExpression( TopicExpression topicExpr )
+ {
+ NotificationProducerResource resource = (NotificationProducerResource) getResource();
+ TopicSpaceSet topicSpaceSet = resource.getTopicSpaceSet();
+ try
+ {
+ return topicSpaceSet.evaluateTopicExpression( topicExpr );
+ }
+ catch ( TopicExpressionException e )
+ {
+ throw new JAXRPCException( "An exception occurred evaluating a TopicExpression. ", e );
+ }
+ }
+
+ private static XmlBeansEndpointReference buildEPR( ResourceContext resourceContext )
+ {
+ EndpointReferenceDocument eprDoc = EndpointReferenceDocument.Factory.newInstance();
+ EndpointReferenceType epr = eprDoc.addNewEndpointReference();
+ AttributedURI address = epr.addNewAddress();
+ address.setStringValue( resourceContext.getServiceURL().toString() );
+ // TODO: set portType and serviceName! (read from config file)
+ ReferencePropertiesType refProps = epr.addNewReferenceProperties();
+ try
+ {
+ XmlBeanUtils.addChildElement( refProps, resourceContext.getResourceKey().getName() );
+ }
+ catch ( ResourceContextException rce )
+ {
+ throw new JAXRPCException( rce );
+ }
+ return new XmlBeansEndpointReference( epr );
}
/**
* Subscribe to notifications from this producer.
*
* @param notificationConsumer
- * @param subscriptionEndConsumer the callback Interface for SubscriptionEnd Notifications, or null if no SubscriptionEnd should be send
+ * @param subscriptionEndConsumer the callback Interface for SubscriptionEnd Notifications, or null if no
+ * SubscriptionEnd should be send
* @param tf
* @param xf
* @param initialTerminationTime
@@ -222,7 +263,10 @@
*
* @return the subscription
*/
- public org.apache.ws.pubsub.Subscription subscribe(NotificationConsumer notificationConsumer, SubscriptionEndConsumer subscriptionEndConsumer, TopicFilter tf, XPathFilter xf, Calendar initialTerminationTime, boolean useNotify)
+ public org.apache.ws.pubsub.Subscription subscribe( NotificationConsumer notificationConsumer,
+ SubscriptionEndConsumer subscriptionEndConsumer,
+ TopicFilter tf, XPathFilter xf,
+ Calendar initialTerminationTime, boolean useNotify )
{
return null; //todo
}
@@ -234,10 +278,14 @@
* @param filters
* @param initialTerminationTime
* @param deliveryMode the notification delivery mode, or null to use default mode
- * @param policy a policy to be associated with the subscription, or null if no policy should be used
+ * @param policy a policy to be associated with the subscription, or null if no policy should be
+ * used
+ *
* @return the subscription
*/
- public org.apache.ws.pubsub.Subscription subscribe(NotificationConsumer notificationConsumer, Filter filters[], Calendar initialTerminationTime, DeliveryMode deliveryMode, Object policy)
+ public org.apache.ws.pubsub.Subscription subscribe( NotificationConsumer notificationConsumer, Filter filters[],
+ Calendar initialTerminationTime, DeliveryMode deliveryMode,
+ Object policy )
{
return null;
}
@@ -246,9 +294,10 @@
* Returns the last notification message published for the given set of filters.
*
* @param filters
+ *
* @return
*/
- public Object getCurrentMessage(Filter filters[])
+ public Object getCurrentMessage( Filter filters[] )
{
return null;
}
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java Thu May 5 16:31:26 2005
@@ -19,7 +19,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ws.XmlObjectWrapper;
-import org.apache.ws.resource.properties.query.QueryExpression;
import org.apache.ws.addressing.Addressing_03_2003_Constants;
import org.apache.ws.addressing.EndpointReference;
import org.apache.ws.notification.base.Subscription;
@@ -27,8 +26,14 @@
import org.apache.ws.notification.topics.Topic;
import org.apache.ws.notification.topics.TopicExpression;
import org.apache.ws.notification.topics.TopicListener;
-import org.apache.ws.pubsub.NotificationConsumer;
import org.apache.ws.pubsub.emitter.EmitterTask;
+import org.apache.ws.resource.properties.ResourcePropertySet;
+import org.apache.ws.resource.properties.query.InvalidQueryExpressionException;
+import org.apache.ws.resource.properties.query.QueryEngine;
+import org.apache.ws.resource.properties.query.QueryEvaluationErrorException;
+import org.apache.ws.resource.properties.query.QueryExpression;
+import org.apache.ws.resource.properties.query.UnknownQueryExpressionDialectException;
+import org.apache.ws.resource.properties.query.impl.QueryEngineImpl;
import org.apache.ws.util.JaxpUtils;
import org.apache.ws.util.XmlBeanUtils;
import org.apache.ws.util.thread.NamedThread;
@@ -54,7 +59,6 @@
import javax.xml.soap.SOAPMessage;
import java.io.IOException;
import java.io.Serializable;
-import java.net.URI;
import java.net.URL;
/**
@@ -64,7 +68,6 @@
*/
public class SimpleSubscriptionTopicListener
implements TopicListener,
- NotificationConsumer,
Serializable
{
private static Log LOG = LogFactory.getLog( SimpleSubscriptionTopicListener.class.getName() );
@@ -80,8 +83,9 @@
EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( "notifmgr-emitter", false ) );
}
+ private static final QueryEngine QUERY_ENGINE = new QueryEngineImpl();
+
private Subscription m_subscription;
- private EndpointReference m_epr;
/**
* Construct a listener instance.
@@ -90,29 +94,7 @@
*/
public SimpleSubscriptionTopicListener( Subscription subscription )
{
- this.m_subscription = subscription;
- subscription.setNotificationConsumer( this );
- m_epr = subscription.getConsumerReference();
- }
-
- /**
- * DOCUMENT_ME
- *
- * @return DOCUMENT_ME
- */
- public EndpointReference getEPR()
- {
- return m_epr;
- }
-
- /**
- * what's this for??
- *
- * @return
- */
- public int getMode()
- {
- return 0;
+ m_subscription = subscription;
}
/**
@@ -120,109 +102,121 @@
*/
public Subscription getSubscription()
{
- return this.m_subscription;
+ return m_subscription;
}
- /**
- * DOCUMENT_ME
- *
- * @param subscription DOCUMENT_ME
- * @param status DOCUMENT_ME
- * @param reason DOCUMENT_ME
- */
- public void end( org.apache.ws.pubsub.Subscription subscription,
- URI status,
- String reason )
+ private void notify( Object rawMsg )
+ throws Exception
{
+ synchronized ( m_subscription )
+ {
+ if ( !m_subscription.isPaused() )
+ {
+ LOG.debug( "Notification being sent for subscription with id " + m_subscription.getID()
+ + "; message value: " + rawMsg );
+ XmlObject msg = XmlBeanUtils.toXmlObject( rawMsg );
+ if ( evaluateSelector( m_subscription.getSelector(), msg ) &&
+ evaluatePrecondition( m_subscription.getPrecondition(),
+ m_subscription.getProducerResource().getResourcePropertySet() ) )
+ {
+ if ( m_subscription.getUseNotify() ) // wrap message in Notify element
+ {
+ msg = wrapMessageWithNotify( msg );
+ }
+ Document document = toDomDocument( msg );
+ EndpointReference consumerEPR = m_subscription.getNotificationConsumer().getEPR();
+ SOAPMessage soapMessage =
+ buildSOAPMessage( document,
+ (EndpointReferenceType) ( (XmlObjectWrapper) consumerEPR ).getXmlObject() );
+ EMITTER_POOL.execute( EmitterTask.createEmitterTask( soapMessage,
+ new URL( consumerEPR.getAddress().toString() ) ) );
+ }
+ }
+ }
}
- /**
- * DOCUMENT_ME
- *
- * @param subscription DOCUMENT_ME
- * @param message DOCUMENT_ME
- */
- public void notify( org.apache.ws.pubsub.Subscription subscription,
- Object message )
+ private XmlObject wrapMessageWithNotify( XmlObject msg )
{
- try
+ NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance();
+ NotifyDocument.Notify notify = notifyDoc.addNewNotify();
+ NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage();
+ notificationMessageHolderType.setMessage( msg );
+ notificationMessageHolderType.setProducerReference( (EndpointReferenceType) ( (XmlObjectWrapper) m_subscription.getNotificationProducer()
+ .getEPR() )
+ .getXmlObject() );
+ TopicExpression topicExpressionIntf = m_subscription.getTopicExpression();
+ TopicExpressionType tp =
+ (TopicExpressionType) ( (XmlObjectWrapper) topicExpressionIntf ).getXmlObject();
+ notificationMessageHolderType.setTopic( tp );
+ msg = notifyDoc;
+ return msg;
+ }
+
+ private boolean evaluateSelector( QueryExpression selector, XmlObject msg )
+ throws UnknownQueryExpressionDialectException, QueryEvaluationErrorException,
+ InvalidQueryExpressionException
+ {
+ boolean result;
+ if ( selector == null )
{
- notify( (Subscription) subscription, message );
+ result = true;
}
- catch ( Exception e )
+ else
{
- throw new RuntimeException( e );
+ Object queryResult = QUERY_ENGINE.executeQuery( selector, msg );
+ try
+ {
+ result = ( (Boolean) queryResult ).booleanValue();
+ }
+ catch ( RuntimeException re )
+ {
+ result = false;
+ LOG.error(
+ "Notification selector '" + selector + "' did not evaluate to a Boolean at notification time." );
+ }
+ LOG.debug(
+ "Notification selector '" + selector + "' evaluated to " + result + " for subscription with id " +
+ m_subscription.getID() +
+ "." );
}
+ return result;
}
- /**
- * Send a notification
- *
- * @param subscription The subscription for which to send the notification
- * @param rawMsg The new value of the topic
- *
- * @throws Exception
- */
- public void notify( Subscription subscription,
- Object rawMsg )
+ private boolean evaluatePrecondition( QueryExpression precondition, ResourcePropertySet propSet )
throws Exception
{
- XmlObject message = null;
- synchronized ( subscription )
+ boolean result;
+ if ( precondition == null )
+ {
+ result = true;
+ }
+ else
{
- if ( !subscription.isPaused() )
+ Object queryResult = QUERY_ENGINE.executeQuery( precondition, propSet );
+ try
{
- LOG.debug( "Notification being sent for subscription with Id " + subscription.getID()
- + "; message value: " + rawMsg );
- if ( subscription.getSelector() != null )
- {
- QueryExpression selector = subscription.getSelector();
-
- }
- EndpointReference epr =
- subscription.getNotificationConsumer().getEPR();
- if ( subscription.getUseNotify() )
- {
- NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance();
- NotifyDocument.Notify notify = notifyDoc.addNewNotify();
- NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage();
-
- //assumes xmlobject for rawMsg...this needs to change
- notificationMessageHolderType.setMessage( (XmlObject) rawMsg );
-
- //set the producer ref
- notificationMessageHolderType.setProducerReference( (EndpointReferenceType) ( (XmlObjectWrapper) subscription.getNotificationProducer()
- .getEPR() )
- .getXmlObject() );
-
- TopicExpression topicExpressionIntf = subscription.getTopicExpression();
- TopicExpressionType tp =
- (TopicExpressionType) ( (XmlObjectWrapper) topicExpressionIntf ).getXmlObject();
- notificationMessageHolderType.setTopic( tp );
- message = notifyDoc;
- }
- else
- {
- message = XmlBeanUtils.toXmlObject( rawMsg );
- }
- Document document = toDocument( message );
-
- SOAPMessage soapMessage =
- buildSOAPMessage( document,
- (EndpointReferenceType) ( (XmlObjectWrapper) epr ).getXmlObject() );
-
- EMITTER_POOL.execute( EmitterTask.createEmitterTask( soapMessage,
- new URL( epr.getAddress().toString() ) ) );
+ result = ( (Boolean) queryResult ).booleanValue();
+ LOG.debug(
+ "Notification precondition '" + precondition + "' evaluated to " + result +
+ " for subscription with id " +
+ m_subscription.getID() +
+ "." );
+ }
+ catch ( RuntimeException re )
+ {
+ result = false;
+ LOG.error(
+ "Notification precondition '" + precondition +
+ "' did not evaluate to a Boolean at notification time." );
}
}
+ return result;
}
- private Document toDocument( XmlObject notifyDoc )
+ private Document toDomDocument( XmlObject notifyDoc )
throws ParserConfigurationException, SAXException, IOException
{
Document document = null;
- //notify is now populated
-
if ( XmlBeanUtils.isDocument( notifyDoc ) )
{
document = (Document) notifyDoc.newDomNode();
@@ -240,31 +234,17 @@
*
* @param topic DOCUMENT_ME
*/
- public void topicAdded( Topic topic )
- {
- //todo
- }
-
- /**
- * DOCUMENT_ME
- *
- * @param topic DOCUMENT_ME
- */
public void topicChanged( Topic topic )
{
- Subscription subscription = m_subscription;
-
- if ( subscription != null )
+ if ( m_subscription != null )
{
try
{
- this.notify( subscription,
- topic.getCurrentMessage() );
+ notify( topic.getCurrentMessage() );
}
catch ( Exception e )
{
- LOG.warn( "notificationFailed" + subscription );
- LOG.debug( "", e );
+ LOG.debug( "Notification for topic " + topic + " failed for subscription: " + m_subscription, e );
}
}
}
@@ -274,6 +254,16 @@
*
* @param topic DOCUMENT_ME
*/
+ public void topicAdded( Topic topic )
+ {
+ //todo
+ }
+
+ /**
+ * DOCUMENT_ME
+ *
+ * @param topic DOCUMENT_ME
+ */
public void topicRemoved( Topic topic )
{
//todo
@@ -291,35 +281,36 @@
throws Exception
{
SOAPFactory factory = SOAPFactory.newInstance();
-
- SOAPHeaderElement h =
+ // TODO: this should not be hard-coded to use WSA 2003/03
+ SOAPHeaderElement headerElem =
header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.TO,
Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
-
- h.addTextNode( consumerEPR.getAddress().getStringValue() );
-
- h = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.ACTION,
+ headerElem.addTextNode( consumerEPR.getAddress().getStringValue() );
+ headerElem = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.ACTION,
Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
- h.addTextNode( BaseNotificationConstants.NOTIFY_ACTION_URL );
+ headerElem.addTextNode( BaseNotificationConstants.NOTIFY_ACTION_URL );
+ addReferencePropertyHeaders( header, consumerEPR );
+ }
+ private void addReferencePropertyHeaders( SOAPHeader header, EndpointReferenceType consumerEPR )
+ throws Exception
+ {
+ SOAPHeaderElement headerElem;
ReferencePropertiesType props = consumerEPR.getReferenceProperties();
if ( props != null )
{
XmlCursor cursor = props.newCursor();
-
- boolean haveChild = cursor.toFirstChild();
-
- while ( haveChild )
+ boolean hasAnotherChild = cursor.toFirstChild();
+ while ( hasAnotherChild )
{
- SOAPElement e = XmlBeanUtils.toSOAPElement( cursor.getObject() );
- h = header.addHeaderElement( e.getElementName() );
- h.addTextNode( e.getValue() );
-
- haveChild = cursor.toNextSibling();
+ // TODO: the below logic should handle refProps that are complexTypes
+ SOAPElement soapElem = XmlBeanUtils.toSOAPElement( cursor.getObject() );
+ headerElem = header.addHeaderElement( soapElem.getElementName() );
+ headerElem.addTextNode( soapElem.getValue() );
+ hasAnotherChild = cursor.toNextSibling();
}
-
cursor.dispose();
}
}
@@ -334,7 +325,6 @@
body.addDocument( fullMsgBodyElem );
SOAPHeader header = msg.getSOAPHeader();
addWSAHeaders( header, consumerEPR );
-
return msg;
}
}
Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java Thu May 5 16:31:26 2005
@@ -15,31 +15,30 @@
*=============================================================================*/
package org.apache.ws.notification.topics.util;
-import org.apache.ws.notification.topics.TopicSpaceSet;
+import org.apache.ws.notification.base.v2004_6.porttype.NotificationProducerPortType;
import org.apache.ws.notification.topics.ResourcePropertyValueChangeTopic;
import org.apache.ws.notification.topics.Topic;
import org.apache.ws.notification.topics.TopicSpace;
-import org.apache.ws.notification.topics.TopicExpressionEngine;
-import org.apache.ws.notification.topics.v2004_06.TopicsConstants;
+import org.apache.ws.notification.topics.TopicSpaceSet;
import org.apache.ws.notification.topics.impl.ResourcePropertyValueChangeTopicImpl;
-import org.apache.ws.notification.topics.impl.TopicSpaceImpl;
import org.apache.ws.notification.topics.impl.ResourceTerminationTopicImpl;
import org.apache.ws.notification.topics.impl.TopicExpressionEngineImpl;
-import org.apache.ws.notification.base.v2004_6.porttype.NotificationProducerPortType;
-import org.apache.ws.resource.properties.ResourceProperty;
-import org.apache.ws.resource.properties.ResourcePropertySet;
-import org.apache.ws.resource.properties.NamespaceVersionHolder;
+import org.apache.ws.notification.topics.impl.TopicSpaceImpl;
+import org.apache.ws.notification.topics.v2004_06.TopicsConstants;
import org.apache.ws.resource.Resource;
import org.apache.ws.resource.lifetime.ResourceTerminationListener;
+import org.apache.ws.resource.properties.NamespaceVersionHolder;
+import org.apache.ws.resource.properties.ResourceProperty;
+import org.apache.ws.resource.properties.ResourcePropertySet;
import org.apache.ws.util.XmlBeanUtils;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.FixedTopicSetDocument;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicDocument;
-import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionDialectsDocument;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
import javax.xml.namespace.QName;
-import java.util.Iterator;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
/**
@@ -58,26 +57,24 @@
throws Exception
{
//there can be only 1 !
- String namespace = namespaces.getLifetimeXsdNamespace();
- Topic topic = null;
+ String namespace = namespaces.getLifetimeXsdNamespace();
TopicSpace topicSpace = topicSpaceSet.getTopicSpace(namespace);
if (topicSpace == null)
{
topicSpace = new TopicSpaceImpl(namespace);
topicSpaceSet.addTopicSpace(topicSpace);
}
+ Topic topic;
if (topicSpace.topicIterator().hasNext())
{
topic = (Topic) topicSpace.topicIterator().next();
}
else
{
- //create the topic
topic = new ResourceTerminationTopicImpl(namespaces);
resource.addTerminationListener((ResourceTerminationListener) topic);
topicSpace.addTopic(topic);
}
-
return topic;
}
Modified: incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java Thu May 5 16:31:26 2005
@@ -15,10 +15,9 @@
*/
package org.apache.ws.pubsub;
-//import org.apache.axis.message.addressing.EndpointReference;
import org.apache.ws.addressing.EndpointReference;
+
import java.util.Calendar;
-import java.net.URI;
public interface NotificationProducer
{
@@ -34,14 +33,15 @@
*
* @param notificationConsumer
* @param subscriptionEndConsumer the callback Interface for SubscriptionEnd Notifications, or null if no SubscriptionEnd should be send
- * @param filters
+ * @param topicFilter
+ * @param xpathFilter
* @param initialTerminationTime
- * @param deliveryMode the notification delivery mode, or null to use default mode
+ * @param useNotify the notification delivery mode, or null to use default mode
*
* @return the subscription
*/
Subscription subscribe( NotificationConsumer notificationConsumer, SubscriptionEndConsumer subscriptionEndConsumer,
- TopicFilter tf, XPathFilter xf,
+ TopicFilter topicFilter, XPathFilter xpathFilter,
Calendar initialTerminationTime,
- boolean UseNotify);
+ boolean useNotify);
}
Modified: incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java Thu May 5 16:31:26 2005
@@ -15,7 +15,6 @@
*/
package org.apache.ws.pubsub;
-import java.net.URI;
import java.util.Calendar;
public interface Subscription
---------------------------------------------------------------------
To unsubscribe, e-mail: hermes-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: hermes-dev-help@ws.apache.org