You are viewing a plain text version of this content. The canonical link for it is here.
Posted to pubscribe-dev@ws.apache.org by ip...@apache.org on 2005/05/06 01:31:27 UTC

svn commit: r168467 - in /incubator/hermes/trunk: ./ src/java/org/apache/ws/eventing/porttype/impl/ src/java/org/apache/ws/eventing/services/ src/java/org/apache/ws/notification/base/ src/java/org/apache/ws/notification/base/impl/ src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/ src/java/org/apache/ws/notification/topics/impl/ src/java/org/apache/ws/notification/topics/util/ src/java/org/apache/ws/pubsub/

Author: ips
Date: Thu May  5 16:31:26 2005
New Revision: 168467

URL: http://svn.apache.org/viewcvs?rev=168467&view=rev
Log:
added support for subscription precondition and selector; various minor

Modified:
    incubator/hermes/trunk/maven.xml
    incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java
    incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
    incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java
    incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java
    incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java

Modified: incubator/hermes/trunk/maven.xml
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/maven.xml?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/maven.xml (original)
+++ incubator/hermes/trunk/maven.xml Thu May  5 16:31:26 2005
@@ -110,21 +110,20 @@
   </postGoal>
 
   <goal name="extract-wsdl-jar">
-    <echo>Extracting Wsdls from: ${wsdljar}</echo>
+    <echo>Extracting WSDLs from ${wsdljar} to ${spec.wsdl.dest.dir} ...</echo>
     <mkdir dir="${maven.build.dir}/temp-wsdl" />
     <mkdir dir="${spec.wsdl.dest.dir}" />   
-      <unzip src="${wsdljar}"
-             dest="${maven.build.dir}/temp-wsdl">
-          <patternset>
-              <include name="**/*.wsdl"/>
-              <include name="**/*.xsd"/>
-          </patternset>
-      </unzip>
-      <copy todir="${spec.wsdl.dest.dir}">
-      	<fileset dir="${maven.build.dir}/temp-wsdl/schema/src/src/wsdl"/>         	
-      </copy>
-  
-     <delete dir="${maven.build.dir}/temp-wsdl" />
+    <unzip src="${wsdljar}"
+           dest="${maven.build.dir}/temp-wsdl">
+      <patternset>
+        <include name="**/*.wsdl"/>
+        <include name="**/*.xsd"/>
+      </patternset>
+    </unzip>
+    <copy todir="${spec.wsdl.dest.dir}">
+      <fileset dir="${maven.build.dir}/temp-wsdl/schema" />         	
+    </copy>
+    <delete dir="${maven.build.dir}/temp-wsdl" />
   </goal>
 
   <!-- ================================================================== -->

Modified: incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/eventing/porttype/impl/simpleNotificationPortTypeImpl.java Thu May  5 16:31:26 2005
@@ -13,25 +13,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.ws.eventing.porttype.impl;
 
 import org.apache.axis.message.SOAPEnvelope;
-import org.apache.ws.resource.properties.impl.AbstractResourcePropertiesPortType;
-import org.apache.ws.resource.ResourceContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ws.eventing.Subscription;
-
-
 import org.apache.ws.eventing.SubscriptionHome;
 
-
 import javax.naming.Context;
 import javax.naming.InitialContext;
 
-
-public class simpleNotificationPortTypeImpl implements org.apache.ws.eventing.porttype.NotificationPortType{
+public class SimpleNotificationPortTypeImpl implements org.apache.ws.eventing.porttype.NotificationPortType{
     private static final Log LOG = LogFactory.getLog( NotificationPortTypeImpl.class.getName() );
     private boolean m_broker;
     

Modified: incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/eventing/services/BrokerService.java Thu May  5 16:31:26 2005
@@ -53,7 +53,7 @@
     }    
     
     public void filter(org.apache.axis.message.SOAPEnvelope req, org.apache.axis.message.SOAPEnvelope resp) {
-        new simpleNotificationPortTypeImpl().filter(req, resp);
+        new SimpleNotificationPortTypeImpl().filter(req, resp);
     }    
     
 }

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/NotificationProducerResource.java Thu May  5 16:31:26 2005
@@ -16,13 +16,14 @@
 package org.apache.ws.notification.base;
 
 import org.apache.ws.notification.topics.TopicSpaceSet;
-import org.apache.ws.resource.Resource;
+import org.apache.ws.resource.PropertiesResource;
 
 /**
- * A resource that implements the WS-BaseNotification NotificationProducer portType.
- * Provides access to the set of TopicSpaces supported by the resource.
+ * A resource that implements the WS-BaseNotification NotificationProducer
+ * portType. Provides access to the resource's {@link TopicSpaceSet} and
+ * {@link org.apache.ws.resource.properties.ResourcePropertySet}.
  */
-public interface NotificationProducerResource extends Resource
+public interface NotificationProducerResource extends PropertiesResource
 {
 
     TopicSpaceSet getTopicSpaceSet();

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/Subscription.java Thu May  5 16:31:26 2005
@@ -20,7 +20,6 @@
 import org.apache.ws.pubsub.NotificationConsumer;
 import org.apache.ws.pubsub.NotificationProducer;
 import org.apache.ws.resource.PropertiesResource;
-import org.apache.ws.resource.Resource;
 import org.apache.ws.resource.lifetime.ScheduledResourceTerminationResource;
 import org.apache.ws.resource.properties.query.QueryExpression;
 
@@ -64,13 +63,14 @@
     
     public void pause() throws Exception;
     
-    public void resume() throws Exception;     
+    public void resume() throws Exception;
+
     /**
-     * Get the producing resource.
+     * Get the producer resource associated with this subscription.
      *
-     * @return the producing resource
+     * @return the producer resource
      */
-    Resource getResource() throws Exception;
+    NotificationProducerResource getProducerResource() throws Exception;
 
     void setEpr(EndpointReference epr);
 

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/impl/AbstractSubscription.java Thu May  5 16:31:26 2005
@@ -42,7 +42,6 @@
 import org.apache.ws.util.uuid.UuidGenerator;
 import org.apache.ws.util.uuid.UuidGeneratorFactory;
 
-import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.xml.rpc.JAXRPCException;
 import java.net.URI;
@@ -132,11 +131,10 @@
         return m_producerReference;
     }
 
-    public Resource getResource() throws Exception
+    public NotificationProducerResource getProducerResource() throws Exception
     {
-        Context initialContext = new InitialContext();
-        ResourceHome producerHome = (ResourceHome) initialContext.lookup( m_producerHomeLocation );
-        return producerHome.find( m_producerKey );
+        ResourceHome producerHome = (ResourceHome) new InitialContext().lookup( m_producerHomeLocation );
+        return (NotificationProducerResource) producerHome.find( m_producerKey );
     }
 
     public QueryExpression getSelector()
@@ -228,7 +226,7 @@
         Resource producerResource = null;
         try
         {
-            producerResource = getResource();
+            producerResource = getProducerResource();
         }
         catch ( Exception e )
         {
@@ -287,7 +285,7 @@
 
     private Topic[] evaluateTopicExpression() throws Exception
     {
-        NotificationProducerResource resource = (NotificationProducerResource) getResource();
+        NotificationProducerResource resource = (NotificationProducerResource) getProducerResource();
         TopicSpaceSet topicSpaceSet = resource.getTopicSpaceSet();
         try
         {

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/base/v2004_6/porttype/impl/NotificationProducerPortTypeImpl.java Thu May  5 16:31:26 2005
@@ -44,7 +44,12 @@
 import org.apache.ws.resource.impl.AbstractResourceContext;
 import org.apache.ws.resource.properties.NamespaceVersionHolder;
 import org.apache.ws.resource.properties.impl.AbstractResourcePropertiesPortType;
+import org.apache.ws.resource.properties.query.QueryEngine;
 import org.apache.ws.resource.properties.query.QueryExpression;
+import org.apache.ws.resource.properties.query.UnknownQueryExpressionDialectException;
+import org.apache.ws.resource.properties.query.QueryEvaluationErrorException;
+import org.apache.ws.resource.properties.query.InvalidQueryExpressionException;
+import org.apache.ws.resource.properties.query.impl.QueryEngineImpl;
 import org.apache.ws.resource.properties.query.impl.XmlBeansQueryExpression;
 import org.apache.ws.util.XmlBeanUtils;
 import org.apache.xmlbeans.XmlObject;
@@ -53,6 +58,8 @@
 import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.SubscribeDocument;
 import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.SubscribeResponseDocument;
 import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
 import org.xmlsoap.schemas.ws.x2003.x03.addressing.AttributedURI;
 import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceDocument;
 import org.xmlsoap.schemas.ws.x2003.x03.addressing.EndpointReferenceType;
@@ -60,6 +67,8 @@
 
 import javax.naming.Context;
 import javax.naming.InitialContext;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.rpc.JAXRPCException;
 import java.util.Calendar;
 
@@ -72,6 +81,23 @@
         implements NotificationProducerPortType, NotificationProducer
 {
 
+    private static final QueryEngine QUERY_ENGINE = new QueryEngineImpl();
+    private static final Element TEST_EVAL_CONTEXT;  // DOM element used to validate selector and precondition XPaths
+
+    static
+    {
+        Document doc = null;
+        try
+        {
+            doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
+        }
+        catch ( ParserConfigurationException pce )
+        {
+            throw new IllegalStateException( "Failed to create DOM document." );
+        }
+        TEST_EVAL_CONTEXT = doc.createElement( "Foo" );
+    }
+
     public NotificationProducerPortTypeImpl( ResourceContext resourceContext )
     {
         super( resourceContext );
@@ -80,120 +106,90 @@
     public SubscribeResponseDocument subscribe( SubscribeDocument requestDoc )
     {
         SubscribeDocument.Subscribe request = requestDoc.getSubscribe();
-
-        //make sure to check if optional elements are set in request...
         Calendar initialTerminationTime = null;
-        if(request.getInitialTerminationTime() != null)
+        if ( request.isSetInitialTerminationTime() )
         {
             initialTerminationTime = request.getInitialTerminationTime();
         }
-
-        QueryExpression precondition = null;
-        if(request.getPrecondition() != null)
-        {
-            precondition = new XmlBeansQueryExpression( request.getPrecondition() );
-        }
-
-        QueryExpression selector = null;
-        if(request.getSelector() != null)
-        {
-            selector = new XmlBeansQueryExpression( request.getSelector() );
-        }
-
         XmlObject subPolicy = null;
-        if(request.getSubscriptionPolicy() != null)
+        if ( request.isSetSubscriptionPolicy() )
         {
             subPolicy = request.getSubscriptionPolicy();
         }
-
         EndpointReferenceType epr = null;
         try
         {
+            QueryExpression precondition = null;
+            if ( request.isSetPrecondition() )
+            {
+                precondition = new XmlBeansQueryExpression( request.getPrecondition() );
+                validateQueryExpressionEvaluatesToBoolean( precondition );
+            }
+            QueryExpression selector = null;
+            if ( request.isSetSelector() )
+            {
+                selector = new XmlBeansQueryExpression( request.getSelector() );
+                validateQueryExpressionEvaluatesToBoolean( selector );
+            }
             TopicExpression topicExpr = new XmlBeansTopicExpression( request.getTopicExpression() );
-            //this prop has default values in schema..it "should" get a value either way if not explicitly set in request
+            // this prop has default values in schema..it "should" get a value either way if not explicitly set in request
             boolean useNotify = request.isSetUseNotify() ? request.getUseNotify() : true;
             EndpointReference producerEPR = buildEPR( getResourceContext() );
 
-            Context initialContext = new InitialContext(  );
-            SubscriptionHome subscriptionHome = (SubscriptionHome) initialContext.lookup( SubscriptionHome.HOME_LOCATION );
-          
-            Subscription subscription = subscriptionHome.create(SubscriptionResource.class, new XmlBeansEndpointReference(request.getConsumerReference()), producerEPR, initialTerminationTime,subPolicy, precondition, selector,getResourceKey(), ((AbstractResourceContext)getResourceContext()).getResourceHomeLocation(),topicExpr,useNotify );
-            subscription.setNotificationProducer(this);
-            epr = (EndpointReferenceType) ((XmlObjectWrapper)subscription.getEpr()).getXmlObject();
-
-            Topic[] topics = evaluateTopicExpression(topicExpr);
-            SimpleSubscriptionTopicListener simpleSubscriptionTopicListener = new SimpleSubscriptionTopicListener(subscription);
+            Context initialContext = new InitialContext();
+            SubscriptionHome subscriptionHome = (SubscriptionHome) initialContext.lookup(
+                    SubscriptionHome.HOME_LOCATION );
+
+            Subscription subscription = subscriptionHome.create( SubscriptionResource.class,
+                    new XmlBeansEndpointReference( request.getConsumerReference() ), producerEPR, initialTerminationTime,
+                    subPolicy, precondition, selector, getResourceKey(),
+                    ( (AbstractResourceContext) getResourceContext() ).getResourceHomeLocation(), topicExpr, useNotify );
+            subscription.setNotificationProducer( this );
+            epr = (EndpointReferenceType) ( (XmlObjectWrapper) subscription.getEpr() ).getXmlObject();
+
+            Topic[] topics = evaluateTopicExpression( topicExpr );
+            SimpleSubscriptionTopicListener simpleSubscriptionTopicListener = new SimpleSubscriptionTopicListener(
+                    subscription );
             for ( int i = 0; i < topics.length; i++ )
             {
-                topics[i].addTopicListener(simpleSubscriptionTopicListener);
+                topics[i].addTopicListener( simpleSubscriptionTopicListener );
             }
         }
-        catch (Exception e)
+        catch ( Exception e )
         {
-            throw new SubscribeCreationFailedFaultException(new WsnNamespaceVersionHolderImpl(),"Subscribe failed, reason: " + e.getLocalizedMessage());
+            throw new SubscribeCreationFailedFaultException( new WsnNamespaceVersionHolderImpl(),
+                    "Subscribe failed, reason: " + e.getLocalizedMessage() );
         }
-
         SubscribeResponseDocument subscribeResponseDoc = SubscribeResponseDocument.Factory.newInstance();
         SubscribeResponseDocument.SubscribeResponse subscribeResponse = subscribeResponseDoc.addNewSubscribeResponse();
-        subscribeResponse.setSubscriptionReference( epr);
+        subscribeResponse.setSubscriptionReference( epr );
         return subscribeResponseDoc;
     }
 
-    private Topic[] evaluateTopicExpression(TopicExpression topicExpr)
-    {
-        NotificationProducerResource resource = (NotificationProducerResource) getResource();
-        TopicSpaceSet topicSpaceSet = resource.getTopicSpaceSet();
-        try
-        {
-            return topicSpaceSet.evaluateTopicExpression(topicExpr);
-        }
-        catch (TopicExpressionException e)
-        {
-            throw new JAXRPCException("An exception occurred evaluating a TopicExpression. ", e);
-        }
-    }
-
-    private static XmlBeansEndpointReference buildEPR( ResourceContext resourceContext )
-    {
-        EndpointReferenceDocument eprDoc = EndpointReferenceDocument.Factory.newInstance();
-        EndpointReferenceType epr = eprDoc.addNewEndpointReference();
-        AttributedURI address = epr.addNewAddress();
-        address.setStringValue( resourceContext.getServiceURL().toString() );
-        // TODO: set portType and serviceName! (read from config file)
-        ReferencePropertiesType refProps = epr.addNewReferenceProperties();
-        try
-        {
-            XmlBeanUtils.addChildElement( refProps, resourceContext.getResourceKey().getName() );
-        }
-        catch ( ResourceContextException rce )
-        {
-            throw new JAXRPCException( rce );
-        }
-        return new XmlBeansEndpointReference(epr);
-    }
-
     public GetCurrentMessageResponseDocument getCurrentMessage( GetCurrentMessageDocument requestDoc )
     {
         GetCurrentMessageDocument.GetCurrentMessage request = requestDoc.getGetCurrentMessage();
         TopicExpressionType topicExprXmlBean = request.getTopic();
         TopicExpression topicExpr = new XmlBeansTopicExpression( topicExprXmlBean );
-        Topic[] topics = evaluateTopicExpression(topicExpr);
+        Topic[] topics = evaluateTopicExpression( topicExpr );
 
-        if(topics.length == 0)
+        if ( topics.length == 0 )
         {
-            throw new NoCurrentMessageOnTopicFaultException(new WsnNamespaceVersionHolderImpl(),"There was no current message to retrieve.");
+            throw new NoCurrentMessageOnTopicFaultException( new WsnNamespaceVersionHolderImpl(),
+                    "There was no current message to retrieve." );
         }
 
         GetCurrentMessageResponseDocument message = GetCurrentMessageResponseDocument.Factory.newInstance();
-            GetCurrentMessageResponseDocument.GetCurrentMessageResponse response = message.addNewGetCurrentMessageResponse();
-        if(topics.length > 1)
+        GetCurrentMessageResponseDocument.GetCurrentMessageResponse response = message.addNewGetCurrentMessageResponse();
+        if ( topics.length > 1 )
         {
-            throw new JAXRPCException("The topicexpression for GetCurrentMessage returned more than 1 value!  This is an invalid request.");
+            throw new JAXRPCException(
+                    "The topicexpression for GetCurrentMessage returned more than 1 value!  This is an invalid request." );
         }
         else
         {
             XmlObject currentMessage = (XmlObject) topics[0].getCurrentMessage();
-            XmlBeanUtils.addChildElement(response, currentMessage);
+            XmlBeanUtils.addChildElement( response, currentMessage );
         }
 
         //todo need to throw appropriate exceptions...
@@ -207,14 +203,59 @@
      */
     public EndpointReference getEPR()
     {
-        return buildEPR(getResourceContext());
+        return buildEPR( getResourceContext() );
+    }
+
+    private void validateQueryExpressionEvaluatesToBoolean( QueryExpression precondition )
+            throws UnknownQueryExpressionDialectException, QueryEvaluationErrorException,
+            InvalidQueryExpressionException
+    {
+        Object queryResult = QUERY_ENGINE.executeQuery( precondition, TEST_EVAL_CONTEXT );
+        if ( !( queryResult instanceof Boolean ) )
+        {
+            throw new IllegalStateException( "Invalid precondition - expression does not return a Boolean." );
+        }
+    }
+
+    private Topic[] evaluateTopicExpression( TopicExpression topicExpr )
+    {
+        NotificationProducerResource resource = (NotificationProducerResource) getResource();
+        TopicSpaceSet topicSpaceSet = resource.getTopicSpaceSet();
+        try
+        {
+            return topicSpaceSet.evaluateTopicExpression( topicExpr );
+        }
+        catch ( TopicExpressionException e )
+        {
+            throw new JAXRPCException( "An exception occurred evaluating a TopicExpression. ", e );
+        }
+    }
+
+    private static XmlBeansEndpointReference buildEPR( ResourceContext resourceContext )
+    {
+        EndpointReferenceDocument eprDoc = EndpointReferenceDocument.Factory.newInstance();
+        EndpointReferenceType epr = eprDoc.addNewEndpointReference();
+        AttributedURI address = epr.addNewAddress();
+        address.setStringValue( resourceContext.getServiceURL().toString() );
+        // TODO: set portType and serviceName! (read from config file)
+        ReferencePropertiesType refProps = epr.addNewReferenceProperties();
+        try
+        {
+            XmlBeanUtils.addChildElement( refProps, resourceContext.getResourceKey().getName() );
+        }
+        catch ( ResourceContextException rce )
+        {
+            throw new JAXRPCException( rce );
+        }
+        return new XmlBeansEndpointReference( epr );
     }
 
     /**
      * Subscribe to notifications from this producer.
      *
      * @param notificationConsumer
-     * @param subscriptionEndConsumer the callback Interface for SubscriptionEnd Notifications, or null if no SubscriptionEnd should be send
+     * @param subscriptionEndConsumer the callback Interface for SubscriptionEnd Notifications, or null if no
+     *                                SubscriptionEnd should be send
      * @param tf
      * @param xf
      * @param initialTerminationTime
@@ -222,7 +263,10 @@
      *
      * @return the subscription
      */
-    public org.apache.ws.pubsub.Subscription subscribe(NotificationConsumer notificationConsumer, SubscriptionEndConsumer subscriptionEndConsumer, TopicFilter tf, XPathFilter xf, Calendar initialTerminationTime, boolean useNotify)
+    public org.apache.ws.pubsub.Subscription subscribe( NotificationConsumer notificationConsumer,
+                                                        SubscriptionEndConsumer subscriptionEndConsumer,
+                                                        TopicFilter tf, XPathFilter xf,
+                                                        Calendar initialTerminationTime, boolean useNotify )
     {
         return null;    //todo
     }
@@ -234,10 +278,14 @@
      * @param filters
      * @param initialTerminationTime
      * @param deliveryMode           the notification delivery mode, or null to use default mode
-     * @param policy                 a policy to be associated with the subscription, or null if no policy should be used
+     * @param policy                 a policy to be associated with the subscription, or null if no policy should be
+     *                               used
+     *
      * @return the subscription
      */
-    public org.apache.ws.pubsub.Subscription subscribe(NotificationConsumer notificationConsumer, Filter filters[], Calendar initialTerminationTime, DeliveryMode deliveryMode, Object policy)
+    public org.apache.ws.pubsub.Subscription subscribe( NotificationConsumer notificationConsumer, Filter filters[],
+                                                        Calendar initialTerminationTime, DeliveryMode deliveryMode,
+                                                        Object policy )
     {
         return null;
     }
@@ -246,9 +294,10 @@
      * Returns the last notification message published for the given set of filters.
      *
      * @param filters
+     *
      * @return
      */
-    public Object getCurrentMessage(Filter filters[])
+    public Object getCurrentMessage( Filter filters[] )
     {
         return null;
     }

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/impl/SimpleSubscriptionTopicListener.java Thu May  5 16:31:26 2005
@@ -19,7 +19,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ws.XmlObjectWrapper;
-import org.apache.ws.resource.properties.query.QueryExpression;
 import org.apache.ws.addressing.Addressing_03_2003_Constants;
 import org.apache.ws.addressing.EndpointReference;
 import org.apache.ws.notification.base.Subscription;
@@ -27,8 +26,14 @@
 import org.apache.ws.notification.topics.Topic;
 import org.apache.ws.notification.topics.TopicExpression;
 import org.apache.ws.notification.topics.TopicListener;
-import org.apache.ws.pubsub.NotificationConsumer;
 import org.apache.ws.pubsub.emitter.EmitterTask;
+import org.apache.ws.resource.properties.ResourcePropertySet;
+import org.apache.ws.resource.properties.query.InvalidQueryExpressionException;
+import org.apache.ws.resource.properties.query.QueryEngine;
+import org.apache.ws.resource.properties.query.QueryEvaluationErrorException;
+import org.apache.ws.resource.properties.query.QueryExpression;
+import org.apache.ws.resource.properties.query.UnknownQueryExpressionDialectException;
+import org.apache.ws.resource.properties.query.impl.QueryEngineImpl;
 import org.apache.ws.util.JaxpUtils;
 import org.apache.ws.util.XmlBeanUtils;
 import org.apache.ws.util.thread.NamedThread;
@@ -54,7 +59,6 @@
 import javax.xml.soap.SOAPMessage;
 import java.io.IOException;
 import java.io.Serializable;
-import java.net.URI;
 import java.net.URL;
 
 /**
@@ -64,7 +68,6 @@
  */
 public class SimpleSubscriptionTopicListener
         implements TopicListener,
-        NotificationConsumer,
         Serializable
 {
     private static Log LOG = LogFactory.getLog( SimpleSubscriptionTopicListener.class.getName() );
@@ -80,8 +83,9 @@
         EMITTER_POOL.setThreadFactory( new NamedThread.ConcurrentThreadFactory( "notifmgr-emitter", false ) );
     }
 
+    private static final QueryEngine QUERY_ENGINE = new QueryEngineImpl();
+
     private Subscription m_subscription;
-    private EndpointReference m_epr;
 
     /**
      * Construct a listener instance.
@@ -90,29 +94,7 @@
      */
     public SimpleSubscriptionTopicListener( Subscription subscription )
     {
-        this.m_subscription = subscription;
-        subscription.setNotificationConsumer( this );
-        m_epr = subscription.getConsumerReference();
-    }
-
-    /**
-     * DOCUMENT_ME
-     *
-     * @return DOCUMENT_ME
-     */
-    public EndpointReference getEPR()
-    {
-        return m_epr;
-    }
-
-    /**
-     * what's this for??
-     *
-     * @return
-     */
-    public int getMode()
-    {
-        return 0;
+        m_subscription = subscription;
     }
 
     /**
@@ -120,109 +102,121 @@
      */
     public Subscription getSubscription()
     {
-        return this.m_subscription;
+        return m_subscription;
     }
 
-    /**
-     * DOCUMENT_ME
-     *
-     * @param subscription DOCUMENT_ME
-     * @param status       DOCUMENT_ME
-     * @param reason       DOCUMENT_ME
-     */
-    public void end( org.apache.ws.pubsub.Subscription subscription,
-                     URI status,
-                     String reason )
+    private void notify( Object rawMsg )
+            throws Exception
     {
+        synchronized ( m_subscription )
+        {
+            if ( !m_subscription.isPaused() )
+            {
+                LOG.debug( "Notification being sent for subscription with id " + m_subscription.getID()
+                        + "; message value: " + rawMsg );
+                XmlObject msg = XmlBeanUtils.toXmlObject( rawMsg );
+                if ( evaluateSelector( m_subscription.getSelector(), msg ) &&
+                     evaluatePrecondition( m_subscription.getPrecondition(),
+                                m_subscription.getProducerResource().getResourcePropertySet() ) )
+                {
+                    if ( m_subscription.getUseNotify() )  // wrap message in Notify element
+                    {
+                        msg = wrapMessageWithNotify( msg );
+                    }
+                    Document document = toDomDocument( msg );
+                    EndpointReference consumerEPR = m_subscription.getNotificationConsumer().getEPR();
+                    SOAPMessage soapMessage =
+                            buildSOAPMessage( document,
+                                    (EndpointReferenceType) ( (XmlObjectWrapper) consumerEPR ).getXmlObject() );
+                    EMITTER_POOL.execute( EmitterTask.createEmitterTask( soapMessage,
+                            new URL( consumerEPR.getAddress().toString() ) ) );
+                }
+            }
+        }
     }
 
-    /**
-     * DOCUMENT_ME
-     *
-     * @param subscription DOCUMENT_ME
-     * @param message      DOCUMENT_ME
-     */
-    public void notify( org.apache.ws.pubsub.Subscription subscription,
-                        Object message )
+    private XmlObject wrapMessageWithNotify( XmlObject msg )
     {
-        try
+        NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance();
+        NotifyDocument.Notify notify = notifyDoc.addNewNotify();
+        NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage();
+        notificationMessageHolderType.setMessage( msg );
+        notificationMessageHolderType.setProducerReference( (EndpointReferenceType) ( (XmlObjectWrapper) m_subscription.getNotificationProducer()
+                .getEPR() )
+                .getXmlObject() );
+        TopicExpression topicExpressionIntf = m_subscription.getTopicExpression();
+        TopicExpressionType tp =
+                (TopicExpressionType) ( (XmlObjectWrapper) topicExpressionIntf ).getXmlObject();
+        notificationMessageHolderType.setTopic( tp );
+        msg = notifyDoc;
+        return msg;
+    }
+
+    private boolean evaluateSelector( QueryExpression selector, XmlObject msg )
+            throws UnknownQueryExpressionDialectException, QueryEvaluationErrorException,
+            InvalidQueryExpressionException
+    {
+        boolean result;
+        if ( selector == null )
         {
-            notify( (Subscription) subscription, message );
+            result = true;
         }
-        catch ( Exception e )
+        else
         {
-            throw new RuntimeException( e );
+            Object queryResult = QUERY_ENGINE.executeQuery( selector, msg );
+            try
+            {
+                result = ( (Boolean) queryResult ).booleanValue();
+            }
+            catch ( RuntimeException re )
+            {
+                result = false;
+                LOG.error(
+                        "Notification selector '" + selector + "' did not evaluate to a Boolean at notification time." );
+            }
+            LOG.debug(
+                    "Notification selector '" + selector + "' evaluated to " + result + " for subscription with id " +
+                    m_subscription.getID() +
+                    "." );
         }
+        return result;
     }
 
-    /**
-     * Send a notification
-     *
-     * @param subscription The subscription for which to send the notification
-     * @param rawMsg       The new value of the topic
-     *
-     * @throws Exception
-     */
-    public void notify( Subscription subscription,
-                        Object rawMsg )
+    private boolean evaluatePrecondition( QueryExpression precondition, ResourcePropertySet propSet )
             throws Exception
     {
-        XmlObject message = null;
-        synchronized ( subscription )
+        boolean result;
+        if ( precondition == null )
+        {
+            result = true;
+        }
+        else
         {
-            if ( !subscription.isPaused() )
+            Object queryResult = QUERY_ENGINE.executeQuery( precondition, propSet );
+            try
             {
-                LOG.debug( "Notification being sent for subscription with Id " + subscription.getID()
-                        + "; message value: " + rawMsg );
-                if ( subscription.getSelector() != null )
-                {
-                    QueryExpression selector = subscription.getSelector();
-                    
-                }
-                EndpointReference epr =
-                        subscription.getNotificationConsumer().getEPR();
-                if ( subscription.getUseNotify() )
-                {
-                    NotifyDocument notifyDoc = NotifyDocument.Factory.newInstance();
-                    NotifyDocument.Notify notify = notifyDoc.addNewNotify();
-                    NotificationMessageHolderType notificationMessageHolderType = notify.addNewNotificationMessage();
-
-                    //assumes xmlobject for rawMsg...this needs to change
-                    notificationMessageHolderType.setMessage( (XmlObject) rawMsg );
-
-                    //set the producer ref
-                    notificationMessageHolderType.setProducerReference( (EndpointReferenceType) ( (XmlObjectWrapper) subscription.getNotificationProducer()
-                            .getEPR() )
-                            .getXmlObject() );
-
-                    TopicExpression topicExpressionIntf = subscription.getTopicExpression();
-                    TopicExpressionType tp =
-                            (TopicExpressionType) ( (XmlObjectWrapper) topicExpressionIntf ).getXmlObject();
-                    notificationMessageHolderType.setTopic( tp );
-                    message = notifyDoc;
-                }
-                else
-                {
-                    message = XmlBeanUtils.toXmlObject( rawMsg );
-                }
-                Document document = toDocument( message );
-
-                SOAPMessage soapMessage =
-                        buildSOAPMessage( document,
-                                (EndpointReferenceType) ( (XmlObjectWrapper) epr ).getXmlObject() );
-
-                EMITTER_POOL.execute( EmitterTask.createEmitterTask( soapMessage,
-                        new URL( epr.getAddress().toString() ) ) );
+                result = ( (Boolean) queryResult ).booleanValue();
+                LOG.debug(
+                        "Notification precondition '" + precondition + "' evaluated to " + result +
+                        " for subscription with id " +
+                        m_subscription.getID() +
+                        "." );
+            }
+            catch ( RuntimeException re )
+            {
+                result = false;
+                LOG.error(
+                        "Notification precondition '" + precondition +
+                        "' did not evaluate to a Boolean at notification time." );
             }
         }
+        return result;
     }
 
-    private Document toDocument( XmlObject notifyDoc )
+    private Document toDomDocument( XmlObject notifyDoc )
             throws ParserConfigurationException, SAXException, IOException
     {
         Document document = null;
-        //notify is now populated
-
         if ( XmlBeanUtils.isDocument( notifyDoc ) )
         {
             document = (Document) notifyDoc.newDomNode();
@@ -240,31 +234,17 @@
      *
      * @param topic DOCUMENT_ME
      */
-    public void topicAdded( Topic topic )
-    {
-        //todo
-    }
-
-    /**
-     * DOCUMENT_ME
-     *
-     * @param topic DOCUMENT_ME
-     */
     public void topicChanged( Topic topic )
     {
-        Subscription subscription = m_subscription;
-
-        if ( subscription != null )
+        if ( m_subscription != null )
         {
             try
             {
-                this.notify( subscription,
-                        topic.getCurrentMessage() );
+                notify( topic.getCurrentMessage() );
             }
             catch ( Exception e )
             {
-                LOG.warn( "notificationFailed" + subscription );
-                LOG.debug( "", e );
+                LOG.debug( "Notification for topic " + topic + " failed for subscription: " + m_subscription, e );
             }
         }
     }
@@ -274,6 +254,16 @@
      *
      * @param topic DOCUMENT_ME
      */
+    public void topicAdded( Topic topic )
+    {
+        //todo
+    }
+
+    /**
+     * DOCUMENT_ME
+     *
+     * @param topic DOCUMENT_ME
+     */
     public void topicRemoved( Topic topic )
     {
         //todo
@@ -291,35 +281,36 @@
             throws Exception
     {
         SOAPFactory factory = SOAPFactory.newInstance();
-
-        SOAPHeaderElement h =
+        // TODO: this should not be hard-coded to use WSA 2003/03
+        SOAPHeaderElement headerElem =
                 header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.TO,
                         Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
                         Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
-
-        h.addTextNode( consumerEPR.getAddress().getStringValue() );
-
-        h = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.ACTION,
+        headerElem.addTextNode( consumerEPR.getAddress().getStringValue() );
+        headerElem = header.addHeaderElement( factory.createName( Addressing_03_2003_Constants.ACTION,
                 Addressing_03_2003_Constants.NSPREFIX_WSRL_SCHEMA,
                 Addressing_03_2003_Constants.NSURI_WSRL_SCHEMA ) );
-        h.addTextNode( BaseNotificationConstants.NOTIFY_ACTION_URL );
+        headerElem.addTextNode( BaseNotificationConstants.NOTIFY_ACTION_URL );
+        addReferencePropertyHeaders( header, consumerEPR );
+    }
 
+    private void addReferencePropertyHeaders( SOAPHeader header, EndpointReferenceType consumerEPR )
+            throws Exception
+    {
+        SOAPHeaderElement headerElem;
         ReferencePropertiesType props = consumerEPR.getReferenceProperties();
         if ( props != null )
         {
             XmlCursor cursor = props.newCursor();
-
-            boolean haveChild = cursor.toFirstChild();
-
-            while ( haveChild )
+            boolean hasAnotherChild = cursor.toFirstChild();
+            while ( hasAnotherChild )
             {
-                SOAPElement e = XmlBeanUtils.toSOAPElement( cursor.getObject() );
-                h = header.addHeaderElement( e.getElementName() );
-                h.addTextNode( e.getValue() );
-
-                haveChild = cursor.toNextSibling();
+                // TODO: the below logic should handle refProps that are complexTypes
+                SOAPElement soapElem = XmlBeanUtils.toSOAPElement( cursor.getObject() );
+                headerElem = header.addHeaderElement( soapElem.getElementName() );
+                headerElem.addTextNode( soapElem.getValue() );
+                hasAnotherChild = cursor.toNextSibling();
             }
-
             cursor.dispose();
         }
     }
@@ -334,7 +325,6 @@
         body.addDocument( fullMsgBodyElem );
         SOAPHeader header = msg.getSOAPHeader();
         addWSAHeaders( header, consumerEPR );
-
         return msg;
     }
 }

Modified: incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/notification/topics/util/TopicUtils.java Thu May  5 16:31:26 2005
@@ -15,31 +15,30 @@
  *=============================================================================*/
 package org.apache.ws.notification.topics.util;
 
-import org.apache.ws.notification.topics.TopicSpaceSet;
+import org.apache.ws.notification.base.v2004_6.porttype.NotificationProducerPortType;
 import org.apache.ws.notification.topics.ResourcePropertyValueChangeTopic;
 import org.apache.ws.notification.topics.Topic;
 import org.apache.ws.notification.topics.TopicSpace;
-import org.apache.ws.notification.topics.TopicExpressionEngine;
-import org.apache.ws.notification.topics.v2004_06.TopicsConstants;
+import org.apache.ws.notification.topics.TopicSpaceSet;
 import org.apache.ws.notification.topics.impl.ResourcePropertyValueChangeTopicImpl;
-import org.apache.ws.notification.topics.impl.TopicSpaceImpl;
 import org.apache.ws.notification.topics.impl.ResourceTerminationTopicImpl;
 import org.apache.ws.notification.topics.impl.TopicExpressionEngineImpl;
-import org.apache.ws.notification.base.v2004_6.porttype.NotificationProducerPortType;
-import org.apache.ws.resource.properties.ResourceProperty;
-import org.apache.ws.resource.properties.ResourcePropertySet;
-import org.apache.ws.resource.properties.NamespaceVersionHolder;
+import org.apache.ws.notification.topics.impl.TopicSpaceImpl;
+import org.apache.ws.notification.topics.v2004_06.TopicsConstants;
 import org.apache.ws.resource.Resource;
 import org.apache.ws.resource.lifetime.ResourceTerminationListener;
+import org.apache.ws.resource.properties.NamespaceVersionHolder;
+import org.apache.ws.resource.properties.ResourceProperty;
+import org.apache.ws.resource.properties.ResourcePropertySet;
 import org.apache.ws.util.XmlBeanUtils;
 import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.FixedTopicSetDocument;
 import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicDocument;
-import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
 import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionDialectsDocument;
+import org.oasisOpen.docs.wsn.x2004.x06.wsnWSBaseNotification12Draft01.TopicExpressionType;
 
 import javax.xml.namespace.QName;
-import java.util.Iterator;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 /**
@@ -58,26 +57,24 @@
             throws Exception
     {
         //there can be only 1 !
-        String namespace = namespaces.getLifetimeXsdNamespace();
-        Topic topic = null;
+        String namespace = namespaces.getLifetimeXsdNamespace();        
         TopicSpace topicSpace = topicSpaceSet.getTopicSpace(namespace);
         if (topicSpace == null)
         {
             topicSpace = new TopicSpaceImpl(namespace);
             topicSpaceSet.addTopicSpace(topicSpace);
         }
+        Topic topic;
         if (topicSpace.topicIterator().hasNext())
         {
             topic = (Topic) topicSpace.topicIterator().next();
         }
         else
         {
-            //create the topic
             topic = new ResourceTerminationTopicImpl(namespaces);
             resource.addTerminationListener((ResourceTerminationListener) topic);
             topicSpace.addTopic(topic);
         }
-
         return topic;
     }
 

Modified: incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/pubsub/NotificationProducer.java Thu May  5 16:31:26 2005
@@ -15,10 +15,9 @@
  */
 package org.apache.ws.pubsub;
 
-//import org.apache.axis.message.addressing.EndpointReference;
 import org.apache.ws.addressing.EndpointReference;
+
 import java.util.Calendar;
-import java.net.URI;
 
 public interface NotificationProducer
 {
@@ -34,14 +33,15 @@
      *
      * @param notificationConsumer
      * @param subscriptionEndConsumer the callback Interface for SubscriptionEnd Notifications, or null if no SubscriptionEnd should be send
-     * @param filters
+     * @param topicFilter
+     * @param xpathFilter
      * @param initialTerminationTime
-     * @param deliveryMode the notification delivery mode, or null to use default mode
+     * @param useNotify the notification delivery mode, or null to use default mode
      *
      * @return the subscription
      */
     Subscription subscribe( NotificationConsumer notificationConsumer, SubscriptionEndConsumer subscriptionEndConsumer,
-                            TopicFilter tf, XPathFilter xf,
+                            TopicFilter topicFilter, XPathFilter xpathFilter,
                             Calendar initialTerminationTime,
-                            boolean UseNotify);
+                            boolean useNotify);
 }

Modified: incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java?rev=168467&r1=168466&r2=168467&view=diff
==============================================================================
--- incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java (original)
+++ incubator/hermes/trunk/src/java/org/apache/ws/pubsub/Subscription.java Thu May  5 16:31:26 2005
@@ -15,7 +15,6 @@
  */
 package org.apache.ws.pubsub;
 
-import java.net.URI;
 import java.util.Calendar;
 
 public interface Subscription



---------------------------------------------------------------------
To unsubscribe, e-mail: hermes-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: hermes-dev-help@ws.apache.org