You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by gn...@apache.org on 2011/10/07 11:41:14 UTC

svn commit: r1179981 [1/7] - in /cxf/trunk/services: ./ wsn/ wsn/src/ wsn/src/main/ wsn/src/main/java/ wsn/src/main/java/org/ wsn/src/main/java/org/apache/ wsn/src/main/java/org/apache/cxf/ wsn/src/main/java/org/apache/cxf/wsn/ wsn/src/main/java/org/ap...

Author: gnodet
Date: Fri Oct  7 09:41:10 2011
New Revision: 1179981

URL: http://svn.apache.org/viewvc?rev=1179981&view=rev
Log:
[CXF-3848] WS-Notification service implementation

Added:
    cxf/trunk/services/wsn/
    cxf/trunk/services/wsn/pom.xml
    cxf/trunk/services/wsn/src/
    cxf/trunk/services/wsn/src/main/
    cxf/trunk/services/wsn/src/main/java/
    cxf/trunk/services/wsn/src/main/java/org/
    cxf/trunk/services/wsn/src/main/java/org/apache/
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Publisher.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/PullPoint.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Referencable.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Registration.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Subscription.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsCreatePullPoint.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsEndpointManager.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsNotificationBroker.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPublisher.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPullPoint.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsSubscription.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/InvalidTopicException.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsNotificationBroker.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPublisher.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPullPoint.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsTopicExpressionConverter.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/DOMUtil.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/IdGenerator.java
    cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/WSNHelper.java
    cxf/trunk/services/wsn/src/main/resources/
    cxf/trunk/services/wsn/src/main/resources/org/
    cxf/trunk/services/wsn/src/main/resources/org/apache/
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/2001/
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/2001/XMLSchema.dtd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/2001/XMLSchema.xsd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/2001/datatypes.dtd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/b-2.xsd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/bf-2.xsd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/br-2.xsd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/brw-2.wsdl
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/bw-2.wsdl
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/catalog.xml
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/r-2.xsd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/rp-2.xsd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/rpw-2.wsdl
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/rw-2.wsdl
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/t-1.xsd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/ws-addr.xsd
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/wsn.wsdl
    cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/xml.xsd
    cxf/trunk/services/wsn/src/test/
    cxf/trunk/services/wsn/src/test/java/
    cxf/trunk/services/wsn/src/test/java/org/
    cxf/trunk/services/wsn/src/test/java/org/apache/
    cxf/trunk/services/wsn/src/test/java/org/apache/cxf/
    cxf/trunk/services/wsn/src/test/java/org/apache/cxf/wsn/
    cxf/trunk/services/wsn/src/test/java/org/apache/cxf/wsn/CxfTest.java
    cxf/trunk/services/wsn/src/test/java/org/apache/cxf/wsn/RiTest.java
    cxf/trunk/services/wsn/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
    cxf/trunk/services/wsn/src/test/resources/
    cxf/trunk/services/wsn/src/test/resources/log4j.properties
    cxf/trunk/services/wsn/src/test/resources/logging.properties
Modified:
    cxf/trunk/services/pom.xml

Modified: cxf/trunk/services/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/services/pom.xml?rev=1179981&r1=1179980&r2=1179981&view=diff
==============================================================================
--- cxf/trunk/services/pom.xml (original)
+++ cxf/trunk/services/pom.xml Fri Oct  7 09:41:10 2011
@@ -34,6 +34,7 @@
 
     <modules>
         <module>sts</module>
+        <module>wsn</module>
     </modules>
 
 </project>

Added: cxf/trunk/services/wsn/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/pom.xml?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/pom.xml (added)
+++ cxf/trunk/services/wsn/pom.xml Fri Oct  7 09:41:10 2011
@@ -0,0 +1,112 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.cxf</groupId>
+    <artifactId>cxf-services-wsn</artifactId>
+    <packaging>jar</packaging>
+    <version>2.5.0-SNAPSHOT</version>
+    <name>Apache CXF WSN service</name>
+    <url>http://cxf.apache.org</url>
+
+    <parent>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf</artifactId>
+        <version>2.5.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+	
+	<dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.6.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.geronimo.specs</groupId>
+            <artifactId>geronimo-jms_1.1_spec</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-core</artifactId>
+            <version>5.5.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-frontend-jaxws</artifactId>
+            <version>${project.version}</version>
+			<scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.sun.xml.bind</groupId>
+                    <artifactId>jaxb-impl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http-jetty</artifactId>
+            <version>${project.version}</version>
+			<scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.8.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.6.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+	
+	<build>
+		<plugins>
+            <plugin>
+                <groupId>org.apache.cxf</groupId>
+                <artifactId>cxf-codegen-plugin</artifactId>
+	            <version>${project.version}</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <configuration>
+                            <sourceRoot>${basedir}/target/jaxws</sourceRoot>
+                            <wsdlOptions>
+                                <wsdlOption>
+                                    <wsdl>${basedir}/src/main/resources/org/apache/cxf/wsn/wsn.wsdl</wsdl>
+                                    <extraargs>
+                                        <extraarg>-verbose</extraarg>
+                                    </extraargs>
+                                </wsdlOption>
+                            </wsdlOptions>
+                        </configuration>
+                        <goals>
+                            <goal>wsdl2java</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+		</plugins>
+	</build>
+
+</project>

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.apache.cxf.wsn.util.DOMUtil;
+import org.apache.cxf.wsn.util.IdGenerator;
+import org.oasis_open.docs.wsn.b_2.CreatePullPointResponse;
+import org.oasis_open.docs.wsn.b_2.UnableToCreatePullPointFaultType;
+import org.oasis_open.docs.wsn.bw_2.CreatePullPoint;
+import org.oasis_open.docs.wsn.bw_2.UnableToCreatePullPointFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroyPullPointFault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Element;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.CreatePullPoint")
+public abstract class AbstractCreatePullPoint extends AbstractEndpoint implements CreatePullPoint {
+
+    private final Logger logger = LoggerFactory.getLogger(AbstractCreatePullPoint.class);
+
+    private IdGenerator idGenerator;
+
+    private Map<String, AbstractPullPoint> pullPoints;
+
+    public AbstractCreatePullPoint(String name) {
+        super(name);
+        idGenerator = new IdGenerator();
+        pullPoints = new ConcurrentHashMap<String, AbstractPullPoint>();
+    }
+
+    public void init() throws Exception {
+        register();
+    }
+
+    public void destroy() throws Exception {
+        unregister();
+    }
+
+    @WebMethod(operationName = "CreatePullPoint")
+    @WebResult(name = "CreatePullPointResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+               partName = "CreatePullPointResponse")
+    public CreatePullPointResponse createPullPoint(
+            @WebParam(name = "CreatePullPoint", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+                      partName = "CreatePullPointRequest")
+            org.oasis_open.docs.wsn.b_2.CreatePullPoint createPullPointRequest) throws UnableToCreatePullPointFault {
+
+        logger.debug("CreatePullEndpoint");
+        return handleCreatePullPoint(createPullPointRequest, null);
+    }
+
+    public CreatePullPointResponse handleCreatePullPoint(
+            org.oasis_open.docs.wsn.b_2.CreatePullPoint createPullPointRequest, 
+            EndpointManager manager) throws UnableToCreatePullPointFault {
+        AbstractPullPoint pullPoint = null;
+        boolean success = false;
+        try {
+            pullPoint = createPullPoint(createPullPointName(createPullPointRequest));
+            pullPoint.setCreatePullPoint(this);
+            pullPoints.put(pullPoint.getAddress(), pullPoint);
+            pullPoint.create(createPullPointRequest);
+            if (manager != null) {
+                pullPoint.setManager(manager);
+            }
+            pullPoint.register();
+            CreatePullPointResponse response = new CreatePullPointResponse();
+            response.setPullPoint(pullPoint.getEpr());
+            success = true;
+            return response;
+        } catch (EndpointRegistrationException e) {
+            logger.warn("Unable to register new endpoint", e);
+            UnableToCreatePullPointFaultType fault = new UnableToCreatePullPointFaultType();
+            throw new UnableToCreatePullPointFault("Unable to register new endpoint", fault, e);
+        } finally {
+            if (!success && pullPoint != null) {
+                pullPoints.remove(pullPoint.getAddress());
+                try {
+                    pullPoint.destroy();
+                } catch (UnableToDestroyPullPointFault e) {
+                    logger.info("Error destroying pullPoint", e);
+                }
+            }
+        }
+    }
+
+    protected String createPullPointName(org.oasis_open.docs.wsn.b_2.CreatePullPoint createPullPointRequest) {
+        // Let the creator decide which pull point name to use
+        String name = null;
+        for (Iterator it = createPullPointRequest.getAny().iterator(); it.hasNext();) {
+            Element el = (Element) it.next();
+            if ("name".equals(el.getLocalName())
+                    && "http://cxf.apache.org/wsn2005/1.0".equals(el.getNamespaceURI())) {
+                name = DOMUtil.getElementText(el).trim();
+            }
+        }
+        if (name == null) {
+            // If no name is given, just generate one
+            name = idGenerator.generateSanitizedId();
+        }
+        return name;
+    }
+
+    public void destroyPullPoint(String address) throws UnableToDestroyPullPointFault {
+        AbstractPullPoint pullPoint = pullPoints.remove(address);
+        if (pullPoint != null) {
+            pullPoint.destroy();
+        }
+    }
+
+    protected abstract AbstractPullPoint createPullPoint(String name);
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+public abstract class AbstractEndpoint {
+
+    protected final String name;
+
+    protected String address;
+
+    protected EndpointManager manager;
+
+    protected Object endpoint;
+
+    public AbstractEndpoint(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    public void register() throws EndpointRegistrationException {
+        endpoint = manager.register(getAddress(), this);
+    }
+
+    public void unregister() throws EndpointRegistrationException {
+        if (endpoint != null) {
+            manager.unregister(endpoint);
+        }
+    }
+
+    public W3CEndpointReference getEpr() {
+        if (endpoint != null) {
+            return manager.getEpr(endpoint);
+        }
+        return null;
+    }
+
+    public EndpointManager getManager() {
+        return manager;
+    }
+
+    public void setManager(EndpointManager manager) {
+        this.manager = manager;
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.jws.Oneway;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.xml.namespace.QName;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.apache.cxf.wsn.util.IdGenerator;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
+import org.oasis_open.docs.wsn.b_2.NoCurrentMessageOnTopicFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse;
+import org.oasis_open.docs.wsn.brw_2.NotificationBroker;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationFailedFault;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationRejectedFault;
+import org.oasis_open.docs.wsn.brw_2.ResourceNotDestroyedFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.MultipleTopicsSpecifiedFault;
+import org.oasis_open.docs.wsn.bw_2.NoCurrentMessageOnTopicFault;
+import org.oasis_open.docs.wsn.bw_2.SubscribeCreationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.TopicExpressionDialectUnknownFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroySubscriptionFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnrecognizedPolicyRequestFault;
+import org.oasis_open.docs.wsn.bw_2.UnsupportedPolicyRequestFault;
+import org.oasis_open.docs.wsrf.rp_2.GetResourcePropertyResponse;
+import org.oasis_open.docs.wsrf.rp_2.InvalidResourcePropertyQNameFaultType;
+import org.oasis_open.docs.wsrf.rpw_2.GetResourceProperty;
+import org.oasis_open.docs.wsrf.rpw_2.InvalidResourcePropertyQNameFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnavailableFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.brw_2.NotificationBroker")
+public abstract class AbstractNotificationBroker extends AbstractEndpoint implements NotificationBroker, GetResourceProperty {
+
+    public static final String NAMESPACE_URI = "http://docs.oasis-open.org/wsn/b-2";
+    public static final String PREFIX = "wsnt";
+    public static final QName TOPIC_EXPRESSION_QNAME = new QName(NAMESPACE_URI, "TopicExpression", PREFIX);
+    public static final QName FIXED_TOPIC_SET_QNAME = new QName(NAMESPACE_URI, "FixedTopicSet", PREFIX);
+    public static final QName TOPIC_EXPRESSION_DIALECT_QNAME = new QName(NAMESPACE_URI, "TopicExpressionDialect", PREFIX);
+    public static final QName TOPIC_SET_QNAME = new QName(NAMESPACE_URI, "TopicSet", PREFIX);
+
+    private final Logger logger = LoggerFactory.getLogger(AbstractNotificationBroker.class);
+
+    private IdGenerator idGenerator;
+
+    private AbstractPublisher anonymousPublisher;
+
+    private Map<String, AbstractPublisher> publishers;
+
+    private Map<String, AbstractSubscription> subscriptions;
+
+    public AbstractNotificationBroker(String name) {
+        super(name);
+        idGenerator = new IdGenerator();
+        subscriptions = new ConcurrentHashMap<String, AbstractSubscription>();
+        publishers = new ConcurrentHashMap<String, AbstractPublisher>();
+    }
+
+    public void init() throws Exception {
+        register();
+        anonymousPublisher = createPublisher("AnonymousPublisher");
+        anonymousPublisher.setAddress(new URI(getAddress()).resolve(anonymousPublisher.getName()).toString());
+        anonymousPublisher.register();
+    }
+
+    public void destroy() throws Exception {
+        anonymousPublisher.destroy();
+        unregister();
+    }
+
+    /**
+     * 
+     * @param notify
+     */
+    @WebMethod(operationName = "Notify")
+    @Oneway
+    public void notify(
+            @WebParam(name = "Notify", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-1", 
+                      partName = "Notify")
+            Notify notify) {
+
+        logger.debug("Notify");
+        handleNotify(notify);
+    }
+
+    protected void handleNotify(Notify notify) {
+        for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
+            W3CEndpointReference producerReference = messageHolder.getProducerReference();
+            AbstractPublisher publisher = getPublisher(producerReference);
+            if (publisher != null) {
+                publisher.notify(messageHolder);
+            }
+        }
+    }
+
+    protected AbstractPublisher getPublisher(W3CEndpointReference producerReference) {
+        AbstractPublisher publisher = null;
+        if (producerReference != null) {
+            String address = WSNHelper.getWSAAddress(producerReference);
+            publisher = publishers.get(address);
+        }
+        if (publisher == null) {
+            publisher = anonymousPublisher;
+        }
+        return publisher;
+    }
+
+    /**
+     * 
+     * @param subscribeRequest
+     * @return returns org.oasis_open.docs.wsn.b_1.SubscribeResponse
+     * @throws SubscribeCreationFailedFault
+     * @throws InvalidTopicExpressionFault
+     * @throws TopicNotSupportedFault
+     * @throws InvalidFilterFault
+     * @throws InvalidProducerPropertiesExpressionFault
+     * @throws ResourceUnknownFault
+     * @throws InvalidMessageContentExpressionFault
+     * @throws TopicExpressionDialectUnknownFault
+     * @throws UnacceptableInitialTerminationTimeFault
+     */
+    @WebMethod(operationName = "Subscribe")
+    @WebResult(name = "SubscribeResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/b-1", 
+               partName = "SubscribeResponse")
+    public SubscribeResponse subscribe(
+            @WebParam(name = "Subscribe", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-1", 
+                      partName = "SubscribeRequest")
+            Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault,
+            InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault,
+            SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault,
+            UnacceptableInitialTerminationTimeFault, UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault {
+
+        logger.debug("Subscribe");
+        return handleSubscribe(subscribeRequest, null);
+    }
+
+    public SubscribeResponse handleSubscribe(
+                Subscribe subscribeRequest, 
+                EndpointManager manager) throws InvalidFilterFault, InvalidMessageContentExpressionFault,
+            InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault,
+            SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault,
+            TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault,
+            UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault {
+        AbstractSubscription subscription = null;
+        boolean success = false;
+        try {
+            subscription = createSubscription(idGenerator.generateSanitizedId());
+            subscription.setBroker(this);
+            subscriptions.put(subscription.getAddress(), subscription);
+            subscription.create(subscribeRequest);
+            if (manager != null) {
+                subscription.setManager(manager);
+            }
+            subscription.register();
+            SubscribeResponse response = new SubscribeResponse();
+            response.setSubscriptionReference(subscription.getEpr());
+            success = true;
+            return response;
+        } catch (EndpointRegistrationException e) {
+            logger.warn("Unable to register new endpoint", e);
+            SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+            throw new SubscribeCreationFailedFault("Unable to register new endpoint", fault, e);
+        } finally {
+            if (!success && subscription != null) {
+                subscriptions.remove(subscription);
+                try {
+                    subscription.unsubscribe();
+                } catch (UnableToDestroySubscriptionFault e) {
+                    logger.info("Error destroying subscription", e);
+                }
+            }
+        }
+    }
+
+    public void unsubscribe(String address) throws UnableToDestroySubscriptionFault {
+        AbstractSubscription subscription = (AbstractSubscription) subscriptions.remove(address);
+        if (subscription != null) {
+            subscription.unsubscribe();
+        }
+    }
+
+    /**
+     * 
+     * @param getCurrentMessageRequest
+     * @return returns org.oasis_open.docs.wsn.b_1.GetCurrentMessageResponse
+     * @throws MultipleTopicsSpecifiedFault
+     * @throws TopicNotSupportedFault
+     * @throws InvalidTopicExpressionFault
+     * @throws ResourceUnknownFault
+     * @throws TopicExpressionDialectUnknownFault
+     * @throws NoCurrentMessageOnTopicFault
+     */
+    @WebMethod(operationName = "GetCurrentMessage")
+    @WebResult(name = "GetCurrentMessageResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/b-1", 
+               partName = "GetCurrentMessageResponse")
+    public GetCurrentMessageResponse getCurrentMessage(
+            @WebParam(name = "GetCurrentMessage", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-1", 
+                      partName = "GetCurrentMessageRequest")
+            GetCurrentMessage getCurrentMessageRequest) throws InvalidTopicExpressionFault,
+            MultipleTopicsSpecifiedFault, NoCurrentMessageOnTopicFault, ResourceUnknownFault,
+            TopicExpressionDialectUnknownFault, TopicNotSupportedFault {
+
+        logger.debug("GetCurrentMessage");
+        NoCurrentMessageOnTopicFaultType fault = new NoCurrentMessageOnTopicFaultType();
+        throw new NoCurrentMessageOnTopicFault("There is no current message on this topic.", fault);
+    }
+
+    /**
+     * 
+     * @param registerPublisherRequest
+     * @return returns org.oasis_open.docs.wsn.br_1.RegisterPublisherResponse
+     * @throws PublisherRegistrationRejectedFault
+     * @throws InvalidTopicExpressionFault
+     * @throws TopicNotSupportedFault
+     * @throws ResourceUnknownFault
+     * @throws PublisherRegistrationFailedFault
+     */
+    @WebMethod(operationName = "RegisterPublisher")
+    @WebResult(name = "RegisterPublisherResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/br-1", 
+               partName = "RegisterPublisherResponse")
+    public RegisterPublisherResponse registerPublisher(
+            @WebParam(name = "RegisterPublisher", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/br-1", 
+                      partName = "RegisterPublisherRequest")
+            RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault,
+            PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault,
+            TopicNotSupportedFault {
+
+        logger.debug("RegisterPublisher");
+        return handleRegisterPublisher(registerPublisherRequest);
+    }
+
+    public RegisterPublisherResponse handleRegisterPublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault,
+            PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+        AbstractPublisher publisher = null;
+        boolean success = false;
+        try {
+            publisher = createPublisher(idGenerator.generateSanitizedId());
+            publisher.register();
+            publisher.create(registerPublisherRequest);
+            RegisterPublisherResponse response = new RegisterPublisherResponse();
+            response.setPublisherRegistrationReference(publisher.getEpr());
+            publishers.put(WSNHelper.getWSAAddress(publisher.getPublisherReference()), publisher);
+            success = true;
+            return response;
+        } catch (EndpointRegistrationException e) {
+            logger.warn("Unable to register new endpoint", e);
+            PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+            throw new PublisherRegistrationFailedFault("Unable to register new endpoint", fault, e);
+        } finally {
+            if (!success && publisher != null) {
+                try {
+                    publisher.destroy();
+                } catch (ResourceNotDestroyedFault e) {
+                    logger.info("Error destroying publisher", e);
+                }
+            }
+        }
+    }
+
+    protected abstract AbstractPublisher createPublisher(String name);
+
+    protected abstract AbstractSubscription createSubscription(String name);
+
+    @WebResult(name = "GetResourcePropertyResponse", targetNamespace = "http://docs.oasis-open.org/wsrf/rp-2", partName = "GetResourcePropertyResponse")
+    @WebMethod(operationName = "GetResourceProperty")
+    public GetResourcePropertyResponse getResourceProperty(
+        @WebParam(partName = "GetResourcePropertyRequest", name = "GetResourceProperty", targetNamespace = "http://docs.oasis-open.org/wsrf/rp-2")
+        javax.xml.namespace.QName getResourcePropertyRequest
+    ) throws ResourceUnavailableFault, ResourceUnknownFault, InvalidResourcePropertyQNameFault {
+
+        logger.debug("GetResourceProperty");
+        return handleGetResourceProperty(getResourcePropertyRequest);
+    }
+
+    protected GetResourcePropertyResponse handleGetResourceProperty(QName property)
+            throws ResourceUnavailableFault, ResourceUnknownFault, InvalidResourcePropertyQNameFault {
+        InvalidResourcePropertyQNameFaultType fault = new InvalidResourcePropertyQNameFaultType();
+        throw new InvalidResourcePropertyQNameFault("Invalid resource property QName: " + property, fault);
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.util.List;
+
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.br_2.DestroyRegistration;
+import org.oasis_open.docs.wsn.br_2.DestroyRegistrationResponse;
+import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.ResourceNotDestroyedFaultType;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationFailedFault;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationManager;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationRejectedFault;
+import org.oasis_open.docs.wsn.brw_2.ResourceNotDestroyedFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.brw_2.PublisherRegistrationManager")
+public abstract class AbstractPublisher extends AbstractEndpoint implements PublisherRegistrationManager {
+
+    protected W3CEndpointReference publisherReference;
+
+    protected boolean demand;
+
+    protected List<TopicExpressionType> topic;
+
+    public AbstractPublisher(String name) {
+        super(name);
+    }
+
+    public W3CEndpointReference getPublisherReference() {
+        return publisherReference;
+    }
+
+    /**
+     * 
+     * @param destroyRegistrationRequest
+     * @return returns org.oasis_open.docs.wsn.br_1.DestroyResponse
+     * @throws ResourceNotDestroyedFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "DestroyRegistration")
+    @WebResult(name = "DestroyRegistrationResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/br-2", 
+               partName = "DestroyRegistrationResponse")
+    public DestroyRegistrationResponse destroyRegistration(
+            @WebParam(name = "DestroyRegistration", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/br-2", 
+                      partName = "DestroyRegistrationRequest")
+            DestroyRegistration destroyRegistrationRequest) throws ResourceNotDestroyedFault, ResourceUnknownFault {
+
+        destroy();
+        return new DestroyRegistrationResponse();
+    }
+
+    public abstract void notify(NotificationMessageHolderType messageHolder);
+
+    protected void destroy() throws ResourceNotDestroyedFault {
+        try {
+            unregister();
+        } catch (EndpointRegistrationException e) {
+            ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
+            throw new ResourceNotDestroyedFault("Error unregistering endpoint", fault, e);
+        }
+    }
+
+    public void create(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault,
+            PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault,
+            TopicNotSupportedFault {
+        validatePublisher(registerPublisherRequest);
+        start();
+    }
+
+    protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault,
+            PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault,
+            TopicNotSupportedFault {
+        // Check consumer reference
+        publisherReference = registerPublisherRequest.getPublisherReference();
+        // Check topic
+        topic = registerPublisherRequest.getTopic();
+        // Check demand based
+        demand = registerPublisherRequest.isDemand() != null ? registerPublisherRequest.isDemand().booleanValue()
+                : false;
+        // Check all parameters
+        if (publisherReference == null) {
+            PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+            throw new PublisherRegistrationFailedFault("Invalid PublisherReference: null", fault);
+        }
+        if (demand && (topic == null || topic.size() == 0)) {
+            InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+            throw new InvalidTopicExpressionFault(
+                    "Must specify at least one topic for demand-based publishing", fault);
+        }
+    }
+
+    protected abstract void start() throws PublisherRegistrationFailedFault;
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import javax.jws.Oneway;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
+import org.oasis_open.docs.wsn.b_2.DestroyPullPoint;
+import org.oasis_open.docs.wsn.b_2.DestroyPullPointResponse;
+import org.oasis_open.docs.wsn.b_2.GetMessages;
+import org.oasis_open.docs.wsn.b_2.GetMessagesResponse;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.UnableToDestroyPullPointFaultType;
+import org.oasis_open.docs.wsn.bw_2.NotificationConsumer;
+import org.oasis_open.docs.wsn.bw_2.PullPoint;
+import org.oasis_open.docs.wsn.bw_2.UnableToCreatePullPointFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroyPullPointFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToGetMessagesFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.PullPoint")
+public abstract class AbstractPullPoint extends AbstractEndpoint implements PullPoint, NotificationConsumer {
+
+    private final Logger logger = LoggerFactory.getLogger(AbstractPullPoint.class);
+
+    protected AbstractCreatePullPoint createPullPoint;
+
+    public AbstractPullPoint(String name) {
+        super(name);
+    }
+
+    /**
+     * 
+     * @param notify
+     */
+    @WebMethod(operationName = "Notify")
+    @Oneway
+    public void notify(
+            @WebParam(name = "Notify", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-1", 
+                      partName = "Notify")
+            Notify notify) {
+
+        logger.debug("Notify");
+        for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
+            store(messageHolder);
+        }
+    }
+
+    /**
+     * 
+     * @param getMessagesRequest
+     * @return returns org.oasis_open.docs.wsn.b_1.GetMessagesResponse
+     * @throws ResourceUnknownFault
+     * @throws UnableToGetMessagesFault
+     */
+    @WebMethod(operationName = "GetMessages")
+    @WebResult(name = "GetMessagesResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/b-1", 
+               partName = "GetMessagesResponse")
+    public GetMessagesResponse getMessages(
+            @WebParam(name = "GetMessages", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-1", 
+                      partName = "GetMessagesRequest")
+            GetMessages getMessagesRequest) throws ResourceUnknownFault, UnableToGetMessagesFault {
+
+        logger.debug("GetMessages");
+        BigInteger max = getMessagesRequest.getMaximumNumber();
+        List<NotificationMessageHolderType> messages = getMessages(max != null ? max.intValue() : 0);
+        GetMessagesResponse response = new GetMessagesResponse();
+        response.getNotificationMessage().addAll(messages);
+        return response;
+    }
+
+    /**
+     * 
+     * @param destroyPullPointRequest
+     * @return returns org.oasis_open.docs.wsn.b_1.DestroyResponse
+     * @throws ResourceUnknownFault
+     * @throws UnableToDestroyPullPointFault
+     */
+    @WebMethod(operationName = "DestroyPullPoint")
+    @WebResult(name = "DestroyPullPointResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+               partName = "DestroyPullPointResponse")
+    public DestroyPullPointResponse destroyPullPoint(
+            @WebParam(name = "DestroyPullPoint", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+                      partName = "DestroyPullPointRequest")
+            DestroyPullPoint destroyPullPointRequest) throws ResourceUnknownFault, UnableToDestroyPullPointFault {
+
+        logger.debug("Destroy");
+        createPullPoint.destroyPullPoint(getAddress());
+        return new DestroyPullPointResponse();
+    }
+
+    public void create(CreatePullPoint createPullPointRequest) throws UnableToCreatePullPointFault {
+    }
+
+    protected abstract void store(NotificationMessageHolderType messageHolder);
+
+    protected abstract List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault,
+            UnableToGetMessagesFault;
+
+    protected void destroy() throws UnableToDestroyPullPointFault {
+        try {
+            unregister();
+        } catch (EndpointRegistrationException e) {
+            UnableToDestroyPullPointFaultType fault = new UnableToDestroyPullPointFaultType();
+            throw new UnableToDestroyPullPointFault("Error unregistering endpoint", fault, e);
+        }
+    }
+
+    public AbstractCreatePullPoint getCreatePullPoint() {
+        return createPullPoint;
+    }
+
+    public void setCreatePullPoint(AbstractCreatePullPoint createPullPoint) {
+        this.createPullPoint = createPullPoint;
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.util.GregorianCalendar;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.xml.bind.JAXBElement;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeConstants;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.Duration;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.namespace.QName;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.oasis_open.docs.wsn.b_2.*;
+import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.PausableSubscriptionManager;
+import org.oasis_open.docs.wsn.bw_2.PauseFailedFault;
+import org.oasis_open.docs.wsn.bw_2.ResumeFailedFault;
+import org.oasis_open.docs.wsn.bw_2.SubscribeCreationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.TopicExpressionDialectUnknownFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroySubscriptionFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnrecognizedPolicyRequestFault;
+import org.oasis_open.docs.wsn.bw_2.UnsupportedPolicyRequestFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.PausableSubscriptionManager")
+public abstract class AbstractSubscription extends AbstractEndpoint implements PausableSubscriptionManager {
+
+    public static final String WSN_URI = "http://docs.oasis-open.org/wsn/b-2";
+
+    public static final String XPATH1_URI = "http://www.w3.org/TR/1999/REC-xpath-19991116";
+
+    public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, "TopicExpression");
+
+    public static final QName QNAME_PRODUCER_PROPERTIES = new QName(WSN_URI, "ProducerProperties");
+
+    public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent");
+
+    public static final QName QNAME_USE_RAW = new QName(WSN_URI, "UseRaw");
+
+    protected DatatypeFactory datatypeFactory;
+
+    protected XMLGregorianCalendar terminationTime;
+
+    protected boolean useRaw;
+
+    protected TopicExpressionType topic;
+
+    protected QueryExpressionType contentFilter;
+
+    protected W3CEndpointReference consumerReference;
+
+    protected AbstractNotificationBroker broker;
+
+    public AbstractSubscription(String name) {
+        super(name);
+        try {
+            this.datatypeFactory = DatatypeFactory.newInstance();
+        } catch (DatatypeConfigurationException e) {
+            throw new RuntimeException("Unable to initialize subscription", e);
+        }
+    }
+
+    /**
+     * 
+     * @param renewRequest
+     * @return returns org.oasis_open.docs.wsn.b_1.RenewResponse
+     * @throws UnacceptableTerminationTimeFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "Renew")
+    @WebResult(name = "RenewResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+               partName = "RenewResponse")
+    public RenewResponse renew(
+            @WebParam(name = "Renew", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+                      partName = "RenewRequest")
+            Renew renewRequest) throws ResourceUnknownFault, UnacceptableTerminationTimeFault {
+
+        XMLGregorianCalendar time = validateTerminationTime(renewRequest.getTerminationTime());
+        renew(time);
+        RenewResponse response = new RenewResponse();
+        response.setTerminationTime(time);
+        response.setCurrentTime(getCurrentTime());
+        return response;
+    }
+
+    /**
+     * 
+     * @param unsubscribeRequest
+     * @return returns org.oasis_open.docs.wsn.b_1.UnsubscribeResponse
+     * @throws UnableToDestroySubscriptionFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "Unsubscribe")
+    @WebResult(name = "UnsubscribeResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+               partName = "UnsubscribeResponse")
+    public UnsubscribeResponse unsubscribe(
+            @WebParam(name = "Unsubscribe", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+                      partName = "UnsubscribeRequest")
+            Unsubscribe unsubscribeRequest) throws ResourceUnknownFault, UnableToDestroySubscriptionFault {
+
+        broker.unsubscribe(getAddress());
+        return new UnsubscribeResponse();
+    }
+
+    /**
+     * 
+     * @param pauseSubscriptionRequest
+     * @return returns org.oasis_open.docs.wsn.b_1.PauseSubscriptionResponse
+     * @throws PauseFailedFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "PauseSubscription")
+    @WebResult(name = "PauseSubscriptionResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+               partName = "PauseSubscriptionResponse")
+    public PauseSubscriptionResponse pauseSubscription(
+            @WebParam(name = "PauseSubscription", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+                      partName = "PauseSubscriptionRequest")
+            PauseSubscription pauseSubscriptionRequest) throws PauseFailedFault, ResourceUnknownFault {
+
+        pause();
+        return new PauseSubscriptionResponse();
+    }
+
+    /**
+     * 
+     * @param resumeSubscriptionRequest
+     * @return returns org.oasis_open.docs.wsn.b_1.ResumeSubscriptionResponse
+     * @throws ResumeFailedFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "ResumeSubscription")
+    @WebResult(name = "ResumeSubscriptionResponse", 
+               targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+               partName = "ResumeSubscriptionResponse")
+    public ResumeSubscriptionResponse resumeSubscription(
+            @WebParam(name = "ResumeSubscription", 
+                      targetNamespace = "http://docs.oasis-open.org/wsn/b-2", 
+                      partName = "ResumeSubscriptionRequest")
+            ResumeSubscription resumeSubscriptionRequest) throws ResourceUnknownFault, ResumeFailedFault {
+
+        resume();
+        return new ResumeSubscriptionResponse();
+    }
+
+    protected XMLGregorianCalendar validateInitialTerminationTime(String value) 
+        throws UnacceptableInitialTerminationTimeFault {
+        XMLGregorianCalendar tt = parseTerminationTime(value);
+        if (tt == null) {
+            UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+            throw new UnacceptableInitialTerminationTimeFault("Unable to parse initial termination time: '" + value
+                    + "'", fault);
+        }
+        XMLGregorianCalendar ct = getCurrentTime();
+        int c = tt.compare(ct);
+        if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+            UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+            fault.setMinimumTime(ct);
+            throw new UnacceptableInitialTerminationTimeFault("Invalid initial termination time", fault);
+        }
+        return tt;
+    }
+
+    protected XMLGregorianCalendar validateTerminationTime(String value) throws UnacceptableTerminationTimeFault {
+        XMLGregorianCalendar tt = parseTerminationTime(value);
+        if (tt == null) {
+            UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+            throw new UnacceptableTerminationTimeFault("Unable to parse termination time: '" + value + "'", fault);
+        }
+        XMLGregorianCalendar ct = getCurrentTime();
+        int c = tt.compare(ct);
+        if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+            UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+            fault.setMinimumTime(ct);
+            throw new UnacceptableTerminationTimeFault("Invalid termination time", fault);
+        }
+        return tt;
+    }
+
+    protected XMLGregorianCalendar parseTerminationTime(String value) {
+        try {
+            Duration d = datatypeFactory.newDuration(value);
+            XMLGregorianCalendar c = getCurrentTime();
+            c.add(d);
+            return c;
+        } catch (Exception e) {
+            // Ignore
+        }
+        try {
+            Duration d = datatypeFactory.newDurationDayTime(value);
+            XMLGregorianCalendar c = getCurrentTime();
+            c.add(d);
+            return c;
+        } catch (Exception e) {
+            // Ignore
+        }
+        try {
+            Duration d = datatypeFactory.newDurationYearMonth(value);
+            XMLGregorianCalendar c = getCurrentTime();
+            c.add(d);
+            return c;
+        } catch (Exception e) {
+            // Ignore
+        }
+        try {
+            return datatypeFactory.newXMLGregorianCalendar(value);
+        } catch (Exception e) {
+            // Ignore
+        }
+        return null;
+    }
+
+    protected XMLGregorianCalendar getCurrentTime() {
+        return datatypeFactory.newXMLGregorianCalendar(new GregorianCalendar());
+    }
+
+    public XMLGregorianCalendar getTerminationTime() {
+        return terminationTime;
+    }
+
+    public void setTerminationTime(XMLGregorianCalendar terminationTime) {
+        this.terminationTime = terminationTime;
+    }
+
+    public void create(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault,
+            InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault,
+            TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault,
+            UnrecognizedPolicyRequestFault, UnsupportedPolicyRequestFault {
+        validateSubscription(subscribeRequest);
+        start();
+    }
+
+    protected abstract void start() throws SubscribeCreationFailedFault;
+
+    protected abstract void pause() throws PauseFailedFault;
+
+    protected abstract void resume() throws ResumeFailedFault;
+
+    protected abstract void renew(XMLGregorianCalendar time) throws UnacceptableTerminationTimeFault;
+
+    protected void unsubscribe() throws UnableToDestroySubscriptionFault {
+        try {
+            unregister();
+        } catch (EndpointRegistrationException e) {
+            UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
+            throw new UnableToDestroySubscriptionFault("Error unregistering endpoint", fault, e);
+        }
+    }
+
+    protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault,
+            InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault,
+            InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault,
+            TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault, UnrecognizedPolicyRequestFault,
+            UnsupportedPolicyRequestFault {
+        // Check consumer reference
+        consumerReference = subscribeRequest.getConsumerReference();
+        // Check terminationTime
+        if (subscribeRequest.getInitialTerminationTime() != null
+                && !subscribeRequest.getInitialTerminationTime().isNil()
+                && subscribeRequest.getInitialTerminationTime().getValue() != null) {
+            String strTerminationTime = subscribeRequest.getInitialTerminationTime().getValue();
+            terminationTime = validateInitialTerminationTime(strTerminationTime.trim());
+        }
+        // Check filter
+        if (subscribeRequest.getFilter() != null) {
+            for (Object f : subscribeRequest.getFilter().getAny()) {
+                JAXBElement e = null;
+                if (f instanceof JAXBElement) {
+                    e = (JAXBElement) f;
+                    f = e.getValue();
+                }
+                if (f instanceof TopicExpressionType) {
+                    if (!e.getName().equals(QNAME_TOPIC_EXPRESSION)) {
+                        InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+                        throw new InvalidTopicExpressionFault("Unrecognized TopicExpression: " + e, fault);
+                    }
+                    topic = (TopicExpressionType) f;
+                } else if (f instanceof QueryExpressionType) {
+                    if (e != null && e.getName().equals(QNAME_PRODUCER_PROPERTIES)) {
+                        InvalidProducerPropertiesExpressionFaultType fault = 
+                            new InvalidProducerPropertiesExpressionFaultType();
+                        throw new InvalidProducerPropertiesExpressionFault("ProducerProperties are not supported",
+                                fault);
+                    } else if (e != null && e.getName().equals(QNAME_MESSAGE_CONTENT)) {
+                        if (contentFilter != null) {
+                            InvalidMessageContentExpressionFaultType fault = 
+                                new InvalidMessageContentExpressionFaultType();
+                            throw new InvalidMessageContentExpressionFault(
+                                    "Only one MessageContent filter can be specified", fault);
+                        }
+                        contentFilter = (QueryExpressionType) f;
+                        // Defaults to XPath 1.0
+                        if (contentFilter.getDialect() == null) {
+                            contentFilter.setDialect(XPATH1_URI);
+                        }
+                    } else {
+                        InvalidFilterFaultType fault = new InvalidFilterFaultType();
+                        throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
+                    }
+                } else {
+                    InvalidFilterFaultType fault = new InvalidFilterFaultType();
+                    throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
+                }
+            }
+        }
+        // Check policy
+        if (subscribeRequest.getSubscriptionPolicy() != null) {
+            for (Object p : subscribeRequest.getSubscriptionPolicy().getAny()) {
+                JAXBElement e = null;
+                if (p instanceof JAXBElement) {
+                    e = (JAXBElement) p;
+                    p = e.getValue();
+                }
+                if (p instanceof UseRaw) {
+                    useRaw = true;
+                } else {
+                    UnrecognizedPolicyRequestFaultType fault = new UnrecognizedPolicyRequestFaultType();
+                    throw new UnrecognizedPolicyRequestFault("Unrecognized policy: " + p, fault);
+                }
+            }
+        }
+        // Check all parameters
+        if (consumerReference == null) {
+            SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+            throw new SubscribeCreationFailedFault("Invalid ConsumerReference: null", fault);
+        }
+        // TODO check we can resolve endpoint
+        if (topic == null) {
+            InvalidFilterFaultType fault = new InvalidFilterFaultType();
+            throw new InvalidFilterFault("Must specify a topic to subscribe on", fault);
+        }
+        if (contentFilter != null && !contentFilter.getDialect().equals(XPATH1_URI)) {
+            InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
+            throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect: '"
+                    + contentFilter.getDialect() + "'", fault);
+        }
+        if (terminationTime != null) {
+            UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+            throw new UnacceptableInitialTerminationTimeFault("InitialTerminationTime is not supported", fault);
+        }
+    }
+
+    public AbstractNotificationBroker getBroker() {
+        return broker;
+    }
+
+    public void setBroker(AbstractNotificationBroker broker) {
+        this.broker = broker;
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+public interface EndpointManager {
+
+    Object register(String address, Object service) throws EndpointRegistrationException;
+
+    void unregister(Object endpoint) throws EndpointRegistrationException;
+
+    W3CEndpointReference getEpr(Object endpoint);
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+public class EndpointRegistrationException extends Exception {
+
+    /**
+     * Generated serial version UID
+     */
+    private static final long serialVersionUID = 6365080415473176527L;
+
+    public EndpointRegistrationException() {
+        super();
+    }
+
+    public EndpointRegistrationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public EndpointRegistrationException(String message) {
+        super(message);
+    }
+
+    public EndpointRegistrationException(Throwable cause) {
+        super(cause);
+    }
+
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import javax.jws.WebParam;
+import javax.jws.WebService;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.bw_2.NotificationConsumer;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.NotificationConsumer")
+public class Consumer implements NotificationConsumer, Referencable {
+
+    public interface Callback {
+        void notify(NotificationMessageHolderType message);
+    }
+
+    private final Callback callback;
+    private final Endpoint endpoint;
+
+    public Consumer(Callback callback, String address) {
+        this.callback = callback;
+        this.endpoint = Endpoint.create(this);
+        this.endpoint.publish(address);
+    }
+
+    public void stop() {
+        this.endpoint.stop();
+    }
+
+    public W3CEndpointReference getEpr() {
+        return this.endpoint.getEndpointReference(W3CEndpointReference.class);
+    }
+
+    public void notify(@WebParam(partName = "Notify", name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-2") Notify notify) {
+        for (NotificationMessageHolderType message : notify.getNotificationMessage()) {
+            this.callback.notify(message);
+        }
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.oasis_open.docs.wsn.b_2.CreatePullPointResponse;
+import org.oasis_open.docs.wsn.bw_2.UnableToCreatePullPointFault;
+
+public class CreatePullPoint implements Referencable {
+
+    private final org.oasis_open.docs.wsn.bw_2.CreatePullPoint createPullPoint;
+    private final W3CEndpointReference epr;
+
+    public CreatePullPoint(String address) {
+        this(WSNHelper.createWSA(address));
+    }
+
+    public CreatePullPoint(W3CEndpointReference epr) {
+        this.createPullPoint = WSNHelper.getPort(epr, org.oasis_open.docs.wsn.bw_2.CreatePullPoint.class);
+        this.epr = epr;
+    }
+
+    public org.oasis_open.docs.wsn.bw_2.CreatePullPoint getCreatePullPoint() {
+        return createPullPoint;
+    }
+
+    public W3CEndpointReference getEpr() {
+        return epr;
+    }
+
+    public PullPoint create() throws UnableToCreatePullPointFault {
+        org.oasis_open.docs.wsn.b_2.CreatePullPoint request = new org.oasis_open.docs.wsn.b_2.CreatePullPoint();
+        CreatePullPointResponse response = createPullPoint.createPullPoint(request);
+        return new PullPoint(response.getPullPoint());
+    }
+}

Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java Fri Oct  7 09:41:10 2011
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import java.util.Collections;
+import java.util.List;
+import javax.xml.bind.JAXBElement;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.AbstractSubscription;
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.oasis_open.docs.wsn.b_2.FilterType;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.QueryExpressionType;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.b_2.UseRaw;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationFailedFault;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationRejectedFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.MultipleTopicsSpecifiedFault;
+import org.oasis_open.docs.wsn.bw_2.NoCurrentMessageOnTopicFault;
+import org.oasis_open.docs.wsn.bw_2.NotifyMessageNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.SubscribeCreationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.TopicExpressionDialectUnknownFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnrecognizedPolicyRequestFault;
+import org.oasis_open.docs.wsn.bw_2.UnsupportedPolicyRequestFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+public class NotificationBroker implements Referencable {
+
+    private final org.oasis_open.docs.wsn.brw_2.NotificationBroker broker;
+    private final W3CEndpointReference epr;
+
+    public NotificationBroker(String address) {
+        this(WSNHelper.createWSA(address));
+    }
+
+    public NotificationBroker(W3CEndpointReference epr) {
+        this.broker = WSNHelper.getPort(epr, org.oasis_open.docs.wsn.brw_2.NotificationBroker.class);
+        this.epr = epr;
+    }
+
+    public org.oasis_open.docs.wsn.brw_2.NotificationBroker getBroker() {
+        return broker;
+    }
+
+    public W3CEndpointReference getEpr() {
+        return epr;
+    }
+
+    public void notify(String topic, Object msg) {
+        notify(null, topic, msg);
+    }
+
+    public void notify(Referencable publisher, String topic, Object msg) {
+        Notify notify = new Notify();
+        NotificationMessageHolderType holder = new NotificationMessageHolderType();
+        if (publisher != null) {
+            holder.setProducerReference(publisher.getEpr());
+        }
+        if (topic != null) {
+            TopicExpressionType topicExp = new TopicExpressionType();
+            topicExp.getContent().add(topic);
+            holder.setTopic(topicExp);
+        }
+        holder.setMessage(new NotificationMessageHolderType.Message());
+        holder.getMessage().setAny(msg);
+        notify.getNotificationMessage().add(holder);
+        broker.notify(notify);
+    }
+
+    public Subscription subscribe(Referencable consumer, String topic) throws TopicExpressionDialectUnknownFault, InvalidFilterFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault, InvalidMessageContentExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault, NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault {
+        return subscribe(consumer, topic, null, false);
+    }
+
+    public Subscription subscribe(Referencable consumer, String topic, String xpath) throws TopicExpressionDialectUnknownFault, InvalidFilterFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault, InvalidMessageContentExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault, NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault {
+        return subscribe(consumer, topic, xpath, false);
+    }
+
+    public Subscription subscribe(Referencable consumer, String topic,
+                                  String xpath, boolean raw) throws TopicNotSupportedFault, InvalidFilterFault, TopicExpressionDialectUnknownFault, UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault, InvalidMessageContentExpressionFault, InvalidTopicExpressionFault, UnrecognizedPolicyRequestFault, UnsupportedPolicyRequestFault, ResourceUnknownFault, NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault {
+
+        Subscribe subscribeRequest = new Subscribe();
+        subscribeRequest.setConsumerReference(consumer.getEpr());
+        subscribeRequest.setFilter(new FilterType());
+        if (topic != null) {
+            TopicExpressionType topicExp = new TopicExpressionType();
+            topicExp.getContent().add(topic);
+            subscribeRequest.getFilter().getAny().add(
+                    new JAXBElement<TopicExpressionType>(AbstractSubscription.QNAME_TOPIC_EXPRESSION,
+                            TopicExpressionType.class, topicExp));
+        }
+        if (xpath != null) {
+            QueryExpressionType xpathExp = new QueryExpressionType();
+            xpathExp.setDialect(AbstractSubscription.XPATH1_URI);
+            xpathExp.getContent().add(xpath);
+            subscribeRequest.getFilter().getAny().add(
+                    new JAXBElement<QueryExpressionType>(AbstractSubscription.QNAME_MESSAGE_CONTENT,
+                            QueryExpressionType.class, xpathExp));
+        }
+        if (raw) {
+            subscribeRequest.setSubscriptionPolicy(new Subscribe.SubscriptionPolicy());
+            subscribeRequest.getSubscriptionPolicy().getAny().add(new UseRaw());
+        }
+        SubscribeResponse response = broker.subscribe(subscribeRequest);
+        return new Subscription(response.getSubscriptionReference());
+    }
+
+    public List<Object> getCurrentMessage(String topic) throws TopicNotSupportedFault, TopicExpressionDialectUnknownFault, MultipleTopicsSpecifiedFault, InvalidTopicExpressionFault, ResourceUnknownFault, NoCurrentMessageOnTopicFault {
+        GetCurrentMessage getCurrentMessageRequest = new GetCurrentMessage();
+        if (topic != null) {
+            TopicExpressionType topicExp = new TopicExpressionType();
+            topicExp.getContent().add(topic);
+            getCurrentMessageRequest.setTopic(topicExp);
+        }
+        GetCurrentMessageResponse response = broker.getCurrentMessage(getCurrentMessageRequest);
+        return response.getAny();
+    }
+
+    public Registration registerPublisher(Referencable publisher,
+                                          String topic) throws TopicNotSupportedFault, PublisherRegistrationFailedFault, UnacceptableInitialTerminationTimeFault, InvalidTopicExpressionFault, ResourceUnknownFault, PublisherRegistrationRejectedFault {
+        return registerPublisher(publisher, topic, false);
+    }
+
+    public Registration registerPublisher(Referencable publisher,
+                                       String topic, boolean demand) throws TopicNotSupportedFault, PublisherRegistrationFailedFault, UnacceptableInitialTerminationTimeFault, InvalidTopicExpressionFault, ResourceUnknownFault, PublisherRegistrationRejectedFault {
+        return registerPublisher(publisher, Collections.singletonList(topic), demand);
+    }
+
+    public Registration registerPublisher(Referencable publisher,
+                                       List<String> topics, boolean demand) throws TopicNotSupportedFault, PublisherRegistrationFailedFault, UnacceptableInitialTerminationTimeFault, InvalidTopicExpressionFault, ResourceUnknownFault, PublisherRegistrationRejectedFault {
+
+        RegisterPublisher registerPublisherRequest = new RegisterPublisher();
+        registerPublisherRequest.setPublisherReference(publisher.getEpr());
+        if (topics != null) {
+            for (String topic : topics) {
+                TopicExpressionType topicExp = new TopicExpressionType();
+                topicExp.getContent().add(topic);
+                registerPublisherRequest.getTopic().add(topicExp);
+            }
+        }
+        registerPublisherRequest.setDemand(demand);
+        RegisterPublisherResponse response = broker.registerPublisher(registerPublisherRequest);
+        return new Registration(response.getPublisherRegistrationReference());
+    }
+
+}