You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by gn...@apache.org on 2011/10/07 11:41:14 UTC
svn commit: r1179981 [1/7] - in /cxf/trunk/services: ./ wsn/ wsn/src/
wsn/src/main/ wsn/src/main/java/ wsn/src/main/java/org/
wsn/src/main/java/org/apache/ wsn/src/main/java/org/apache/cxf/
wsn/src/main/java/org/apache/cxf/wsn/ wsn/src/main/java/org/ap...
Author: gnodet
Date: Fri Oct 7 09:41:10 2011
New Revision: 1179981
URL: http://svn.apache.org/viewvc?rev=1179981&view=rev
Log:
[CXF-3848] WS-Notification service implementation
Added:
cxf/trunk/services/wsn/
cxf/trunk/services/wsn/pom.xml
cxf/trunk/services/wsn/src/
cxf/trunk/services/wsn/src/main/
cxf/trunk/services/wsn/src/main/java/
cxf/trunk/services/wsn/src/main/java/org/
cxf/trunk/services/wsn/src/main/java/org/apache/
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Publisher.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/PullPoint.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Referencable.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Registration.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Subscription.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsCreatePullPoint.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsEndpointManager.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsNotificationBroker.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPublisher.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsPullPoint.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jaxws/JaxwsSubscription.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/InvalidTopicException.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsCreatePullPoint.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsNotificationBroker.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPublisher.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsPullPoint.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsSubscription.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/jms/JmsTopicExpressionConverter.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/DOMUtil.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/IdGenerator.java
cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/util/WSNHelper.java
cxf/trunk/services/wsn/src/main/resources/
cxf/trunk/services/wsn/src/main/resources/org/
cxf/trunk/services/wsn/src/main/resources/org/apache/
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/2001/
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/2001/XMLSchema.dtd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/2001/XMLSchema.xsd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/2001/datatypes.dtd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/b-2.xsd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/bf-2.xsd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/br-2.xsd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/brw-2.wsdl
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/bw-2.wsdl
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/catalog.xml
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/r-2.xsd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/rp-2.xsd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/rpw-2.wsdl
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/rw-2.wsdl
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/t-1.xsd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/ws-addr.xsd
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/wsn.wsdl
cxf/trunk/services/wsn/src/main/resources/org/apache/cxf/wsn/xml.xsd
cxf/trunk/services/wsn/src/test/
cxf/trunk/services/wsn/src/test/java/
cxf/trunk/services/wsn/src/test/java/org/
cxf/trunk/services/wsn/src/test/java/org/apache/
cxf/trunk/services/wsn/src/test/java/org/apache/cxf/
cxf/trunk/services/wsn/src/test/java/org/apache/cxf/wsn/
cxf/trunk/services/wsn/src/test/java/org/apache/cxf/wsn/CxfTest.java
cxf/trunk/services/wsn/src/test/java/org/apache/cxf/wsn/RiTest.java
cxf/trunk/services/wsn/src/test/java/org/apache/cxf/wsn/WsnBrokerTest.java
cxf/trunk/services/wsn/src/test/resources/
cxf/trunk/services/wsn/src/test/resources/log4j.properties
cxf/trunk/services/wsn/src/test/resources/logging.properties
Modified:
cxf/trunk/services/pom.xml
Modified: cxf/trunk/services/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/services/pom.xml?rev=1179981&r1=1179980&r2=1179981&view=diff
==============================================================================
--- cxf/trunk/services/pom.xml (original)
+++ cxf/trunk/services/pom.xml Fri Oct 7 09:41:10 2011
@@ -34,6 +34,7 @@
<modules>
<module>sts</module>
+ <module>wsn</module>
</modules>
</project>
Added: cxf/trunk/services/wsn/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/pom.xml?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/pom.xml (added)
+++ cxf/trunk/services/wsn/pom.xml Fri Oct 7 09:41:10 2011
@@ -0,0 +1,112 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-services-wsn</artifactId>
+ <packaging>jar</packaging>
+ <version>2.5.0-SNAPSHOT</version>
+ <name>Apache CXF WSN service</name>
+ <url>http://cxf.apache.org</url>
+
+ <parent>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>5.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxws</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.xml.bind</groupId>
+ <artifactId>jaxb-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-jetty</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.8.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.6.1</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-codegen-plugin</artifactId>
+ <version>${project.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <configuration>
+ <sourceRoot>${basedir}/target/jaxws</sourceRoot>
+ <wsdlOptions>
+ <wsdlOption>
+ <wsdl>${basedir}/src/main/resources/org/apache/cxf/wsn/wsn.wsdl</wsdl>
+ <extraargs>
+ <extraarg>-verbose</extraarg>
+ </extraargs>
+ </wsdlOption>
+ </wsdlOptions>
+ </configuration>
+ <goals>
+ <goal>wsdl2java</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractCreatePullPoint.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.apache.cxf.wsn.util.DOMUtil;
+import org.apache.cxf.wsn.util.IdGenerator;
+import org.oasis_open.docs.wsn.b_2.CreatePullPointResponse;
+import org.oasis_open.docs.wsn.b_2.UnableToCreatePullPointFaultType;
+import org.oasis_open.docs.wsn.bw_2.CreatePullPoint;
+import org.oasis_open.docs.wsn.bw_2.UnableToCreatePullPointFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroyPullPointFault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Element;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.CreatePullPoint")
+public abstract class AbstractCreatePullPoint extends AbstractEndpoint implements CreatePullPoint {
+
+ private final Logger logger = LoggerFactory.getLogger(AbstractCreatePullPoint.class);
+
+ private IdGenerator idGenerator;
+
+ private Map<String, AbstractPullPoint> pullPoints;
+
+ public AbstractCreatePullPoint(String name) {
+ super(name);
+ idGenerator = new IdGenerator();
+ pullPoints = new ConcurrentHashMap<String, AbstractPullPoint>();
+ }
+
+ public void init() throws Exception {
+ register();
+ }
+
+ public void destroy() throws Exception {
+ unregister();
+ }
+
+ @WebMethod(operationName = "CreatePullPoint")
+ @WebResult(name = "CreatePullPointResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "CreatePullPointResponse")
+ public CreatePullPointResponse createPullPoint(
+ @WebParam(name = "CreatePullPoint",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "CreatePullPointRequest")
+ org.oasis_open.docs.wsn.b_2.CreatePullPoint createPullPointRequest) throws UnableToCreatePullPointFault {
+
+ logger.debug("CreatePullEndpoint");
+ return handleCreatePullPoint(createPullPointRequest, null);
+ }
+
+ public CreatePullPointResponse handleCreatePullPoint(
+ org.oasis_open.docs.wsn.b_2.CreatePullPoint createPullPointRequest,
+ EndpointManager manager) throws UnableToCreatePullPointFault {
+ AbstractPullPoint pullPoint = null;
+ boolean success = false;
+ try {
+ pullPoint = createPullPoint(createPullPointName(createPullPointRequest));
+ pullPoint.setCreatePullPoint(this);
+ pullPoints.put(pullPoint.getAddress(), pullPoint);
+ pullPoint.create(createPullPointRequest);
+ if (manager != null) {
+ pullPoint.setManager(manager);
+ }
+ pullPoint.register();
+ CreatePullPointResponse response = new CreatePullPointResponse();
+ response.setPullPoint(pullPoint.getEpr());
+ success = true;
+ return response;
+ } catch (EndpointRegistrationException e) {
+ logger.warn("Unable to register new endpoint", e);
+ UnableToCreatePullPointFaultType fault = new UnableToCreatePullPointFaultType();
+ throw new UnableToCreatePullPointFault("Unable to register new endpoint", fault, e);
+ } finally {
+ if (!success && pullPoint != null) {
+ pullPoints.remove(pullPoint.getAddress());
+ try {
+ pullPoint.destroy();
+ } catch (UnableToDestroyPullPointFault e) {
+ logger.info("Error destroying pullPoint", e);
+ }
+ }
+ }
+ }
+
+ protected String createPullPointName(org.oasis_open.docs.wsn.b_2.CreatePullPoint createPullPointRequest) {
+ // Let the creator decide which pull point name to use
+ String name = null;
+ for (Iterator it = createPullPointRequest.getAny().iterator(); it.hasNext();) {
+ Element el = (Element) it.next();
+ if ("name".equals(el.getLocalName())
+ && "http://cxf.apache.org/wsn2005/1.0".equals(el.getNamespaceURI())) {
+ name = DOMUtil.getElementText(el).trim();
+ }
+ }
+ if (name == null) {
+ // If no name is given, just generate one
+ name = idGenerator.generateSanitizedId();
+ }
+ return name;
+ }
+
+ public void destroyPullPoint(String address) throws UnableToDestroyPullPointFault {
+ AbstractPullPoint pullPoint = pullPoints.remove(address);
+ if (pullPoint != null) {
+ pullPoint.destroy();
+ }
+ }
+
+ protected abstract AbstractPullPoint createPullPoint(String name);
+
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractEndpoint.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+public abstract class AbstractEndpoint {
+
+ protected final String name;
+
+ protected String address;
+
+ protected EndpointManager manager;
+
+ protected Object endpoint;
+
+ public AbstractEndpoint(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ public void register() throws EndpointRegistrationException {
+ endpoint = manager.register(getAddress(), this);
+ }
+
+ public void unregister() throws EndpointRegistrationException {
+ if (endpoint != null) {
+ manager.unregister(endpoint);
+ }
+ }
+
+ public W3CEndpointReference getEpr() {
+ if (endpoint != null) {
+ return manager.getEpr(endpoint);
+ }
+ return null;
+ }
+
+ public EndpointManager getManager() {
+ return manager;
+ }
+
+ public void setManager(EndpointManager manager) {
+ this.manager = manager;
+ }
+
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractNotificationBroker.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.jws.Oneway;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.xml.namespace.QName;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.apache.cxf.wsn.util.IdGenerator;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
+import org.oasis_open.docs.wsn.b_2.NoCurrentMessageOnTopicFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse;
+import org.oasis_open.docs.wsn.brw_2.NotificationBroker;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationFailedFault;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationRejectedFault;
+import org.oasis_open.docs.wsn.brw_2.ResourceNotDestroyedFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.MultipleTopicsSpecifiedFault;
+import org.oasis_open.docs.wsn.bw_2.NoCurrentMessageOnTopicFault;
+import org.oasis_open.docs.wsn.bw_2.SubscribeCreationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.TopicExpressionDialectUnknownFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroySubscriptionFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnrecognizedPolicyRequestFault;
+import org.oasis_open.docs.wsn.bw_2.UnsupportedPolicyRequestFault;
+import org.oasis_open.docs.wsrf.rp_2.GetResourcePropertyResponse;
+import org.oasis_open.docs.wsrf.rp_2.InvalidResourcePropertyQNameFaultType;
+import org.oasis_open.docs.wsrf.rpw_2.GetResourceProperty;
+import org.oasis_open.docs.wsrf.rpw_2.InvalidResourcePropertyQNameFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnavailableFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.brw_2.NotificationBroker")
+public abstract class AbstractNotificationBroker extends AbstractEndpoint implements NotificationBroker, GetResourceProperty {
+
+ public static final String NAMESPACE_URI = "http://docs.oasis-open.org/wsn/b-2";
+ public static final String PREFIX = "wsnt";
+ public static final QName TOPIC_EXPRESSION_QNAME = new QName(NAMESPACE_URI, "TopicExpression", PREFIX);
+ public static final QName FIXED_TOPIC_SET_QNAME = new QName(NAMESPACE_URI, "FixedTopicSet", PREFIX);
+ public static final QName TOPIC_EXPRESSION_DIALECT_QNAME = new QName(NAMESPACE_URI, "TopicExpressionDialect", PREFIX);
+ public static final QName TOPIC_SET_QNAME = new QName(NAMESPACE_URI, "TopicSet", PREFIX);
+
+ private final Logger logger = LoggerFactory.getLogger(AbstractNotificationBroker.class);
+
+ private IdGenerator idGenerator;
+
+ private AbstractPublisher anonymousPublisher;
+
+ private Map<String, AbstractPublisher> publishers;
+
+ private Map<String, AbstractSubscription> subscriptions;
+
+ public AbstractNotificationBroker(String name) {
+ super(name);
+ idGenerator = new IdGenerator();
+ subscriptions = new ConcurrentHashMap<String, AbstractSubscription>();
+ publishers = new ConcurrentHashMap<String, AbstractPublisher>();
+ }
+
+ public void init() throws Exception {
+ register();
+ anonymousPublisher = createPublisher("AnonymousPublisher");
+ anonymousPublisher.setAddress(new URI(getAddress()).resolve(anonymousPublisher.getName()).toString());
+ anonymousPublisher.register();
+ }
+
+ public void destroy() throws Exception {
+ anonymousPublisher.destroy();
+ unregister();
+ }
+
+ /**
+ *
+ * @param notify
+ */
+ @WebMethod(operationName = "Notify")
+ @Oneway
+ public void notify(
+ @WebParam(name = "Notify",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-1",
+ partName = "Notify")
+ Notify notify) {
+
+ logger.debug("Notify");
+ handleNotify(notify);
+ }
+
+ protected void handleNotify(Notify notify) {
+ for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
+ W3CEndpointReference producerReference = messageHolder.getProducerReference();
+ AbstractPublisher publisher = getPublisher(producerReference);
+ if (publisher != null) {
+ publisher.notify(messageHolder);
+ }
+ }
+ }
+
+ protected AbstractPublisher getPublisher(W3CEndpointReference producerReference) {
+ AbstractPublisher publisher = null;
+ if (producerReference != null) {
+ String address = WSNHelper.getWSAAddress(producerReference);
+ publisher = publishers.get(address);
+ }
+ if (publisher == null) {
+ publisher = anonymousPublisher;
+ }
+ return publisher;
+ }
+
+ /**
+ *
+ * @param subscribeRequest
+ * @return returns org.oasis_open.docs.wsn.b_1.SubscribeResponse
+ * @throws SubscribeCreationFailedFault
+ * @throws InvalidTopicExpressionFault
+ * @throws TopicNotSupportedFault
+ * @throws InvalidFilterFault
+ * @throws InvalidProducerPropertiesExpressionFault
+ * @throws ResourceUnknownFault
+ * @throws InvalidMessageContentExpressionFault
+ * @throws TopicExpressionDialectUnknownFault
+ * @throws UnacceptableInitialTerminationTimeFault
+ */
+ @WebMethod(operationName = "Subscribe")
+ @WebResult(name = "SubscribeResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-1",
+ partName = "SubscribeResponse")
+ public SubscribeResponse subscribe(
+ @WebParam(name = "Subscribe",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-1",
+ partName = "SubscribeRequest")
+ Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault,
+ InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault,
+ SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault,
+ UnacceptableInitialTerminationTimeFault, UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault {
+
+ logger.debug("Subscribe");
+ return handleSubscribe(subscribeRequest, null);
+ }
+
+ public SubscribeResponse handleSubscribe(
+ Subscribe subscribeRequest,
+ EndpointManager manager) throws InvalidFilterFault, InvalidMessageContentExpressionFault,
+ InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault,
+ SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault,
+ TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault,
+ UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault {
+ AbstractSubscription subscription = null;
+ boolean success = false;
+ try {
+ subscription = createSubscription(idGenerator.generateSanitizedId());
+ subscription.setBroker(this);
+ subscriptions.put(subscription.getAddress(), subscription);
+ subscription.create(subscribeRequest);
+ if (manager != null) {
+ subscription.setManager(manager);
+ }
+ subscription.register();
+ SubscribeResponse response = new SubscribeResponse();
+ response.setSubscriptionReference(subscription.getEpr());
+ success = true;
+ return response;
+ } catch (EndpointRegistrationException e) {
+ logger.warn("Unable to register new endpoint", e);
+ SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+ throw new SubscribeCreationFailedFault("Unable to register new endpoint", fault, e);
+ } finally {
+ if (!success && subscription != null) {
+ subscriptions.remove(subscription);
+ try {
+ subscription.unsubscribe();
+ } catch (UnableToDestroySubscriptionFault e) {
+ logger.info("Error destroying subscription", e);
+ }
+ }
+ }
+ }
+
+ public void unsubscribe(String address) throws UnableToDestroySubscriptionFault {
+ AbstractSubscription subscription = (AbstractSubscription) subscriptions.remove(address);
+ if (subscription != null) {
+ subscription.unsubscribe();
+ }
+ }
+
+ /**
+ *
+ * @param getCurrentMessageRequest
+ * @return returns org.oasis_open.docs.wsn.b_1.GetCurrentMessageResponse
+ * @throws MultipleTopicsSpecifiedFault
+ * @throws TopicNotSupportedFault
+ * @throws InvalidTopicExpressionFault
+ * @throws ResourceUnknownFault
+ * @throws TopicExpressionDialectUnknownFault
+ * @throws NoCurrentMessageOnTopicFault
+ */
+ @WebMethod(operationName = "GetCurrentMessage")
+ @WebResult(name = "GetCurrentMessageResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-1",
+ partName = "GetCurrentMessageResponse")
+ public GetCurrentMessageResponse getCurrentMessage(
+ @WebParam(name = "GetCurrentMessage",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-1",
+ partName = "GetCurrentMessageRequest")
+ GetCurrentMessage getCurrentMessageRequest) throws InvalidTopicExpressionFault,
+ MultipleTopicsSpecifiedFault, NoCurrentMessageOnTopicFault, ResourceUnknownFault,
+ TopicExpressionDialectUnknownFault, TopicNotSupportedFault {
+
+ logger.debug("GetCurrentMessage");
+ NoCurrentMessageOnTopicFaultType fault = new NoCurrentMessageOnTopicFaultType();
+ throw new NoCurrentMessageOnTopicFault("There is no current message on this topic.", fault);
+ }
+
+ /**
+ *
+ * @param registerPublisherRequest
+ * @return returns org.oasis_open.docs.wsn.br_1.RegisterPublisherResponse
+ * @throws PublisherRegistrationRejectedFault
+ * @throws InvalidTopicExpressionFault
+ * @throws TopicNotSupportedFault
+ * @throws ResourceUnknownFault
+ * @throws PublisherRegistrationFailedFault
+ */
+ @WebMethod(operationName = "RegisterPublisher")
+ @WebResult(name = "RegisterPublisherResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/br-1",
+ partName = "RegisterPublisherResponse")
+ public RegisterPublisherResponse registerPublisher(
+ @WebParam(name = "RegisterPublisher",
+ targetNamespace = "http://docs.oasis-open.org/wsn/br-1",
+ partName = "RegisterPublisherRequest")
+ RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault,
+ PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault,
+ TopicNotSupportedFault {
+
+ logger.debug("RegisterPublisher");
+ return handleRegisterPublisher(registerPublisherRequest);
+ }
+
+ public RegisterPublisherResponse handleRegisterPublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault,
+ PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+ AbstractPublisher publisher = null;
+ boolean success = false;
+ try {
+ publisher = createPublisher(idGenerator.generateSanitizedId());
+ publisher.register();
+ publisher.create(registerPublisherRequest);
+ RegisterPublisherResponse response = new RegisterPublisherResponse();
+ response.setPublisherRegistrationReference(publisher.getEpr());
+ publishers.put(WSNHelper.getWSAAddress(publisher.getPublisherReference()), publisher);
+ success = true;
+ return response;
+ } catch (EndpointRegistrationException e) {
+ logger.warn("Unable to register new endpoint", e);
+ PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+ throw new PublisherRegistrationFailedFault("Unable to register new endpoint", fault, e);
+ } finally {
+ if (!success && publisher != null) {
+ try {
+ publisher.destroy();
+ } catch (ResourceNotDestroyedFault e) {
+ logger.info("Error destroying publisher", e);
+ }
+ }
+ }
+ }
+
+ protected abstract AbstractPublisher createPublisher(String name);
+
+ protected abstract AbstractSubscription createSubscription(String name);
+
+ @WebResult(name = "GetResourcePropertyResponse", targetNamespace = "http://docs.oasis-open.org/wsrf/rp-2", partName = "GetResourcePropertyResponse")
+ @WebMethod(operationName = "GetResourceProperty")
+ public GetResourcePropertyResponse getResourceProperty(
+ @WebParam(partName = "GetResourcePropertyRequest", name = "GetResourceProperty", targetNamespace = "http://docs.oasis-open.org/wsrf/rp-2")
+ javax.xml.namespace.QName getResourcePropertyRequest
+ ) throws ResourceUnavailableFault, ResourceUnknownFault, InvalidResourcePropertyQNameFault {
+
+ logger.debug("GetResourceProperty");
+ return handleGetResourceProperty(getResourcePropertyRequest);
+ }
+
+ protected GetResourcePropertyResponse handleGetResourceProperty(QName property)
+ throws ResourceUnavailableFault, ResourceUnknownFault, InvalidResourcePropertyQNameFault {
+ InvalidResourcePropertyQNameFaultType fault = new InvalidResourcePropertyQNameFaultType();
+ throw new InvalidResourcePropertyQNameFault("Invalid resource property QName: " + property, fault);
+ }
+
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPublisher.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.util.List;
+
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.br_2.DestroyRegistration;
+import org.oasis_open.docs.wsn.br_2.DestroyRegistrationResponse;
+import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.ResourceNotDestroyedFaultType;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationFailedFault;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationManager;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationRejectedFault;
+import org.oasis_open.docs.wsn.brw_2.ResourceNotDestroyedFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.brw_2.PublisherRegistrationManager")
+public abstract class AbstractPublisher extends AbstractEndpoint implements PublisherRegistrationManager {
+
+ protected W3CEndpointReference publisherReference;
+
+ protected boolean demand;
+
+ protected List<TopicExpressionType> topic;
+
+ public AbstractPublisher(String name) {
+ super(name);
+ }
+
+ public W3CEndpointReference getPublisherReference() {
+ return publisherReference;
+ }
+
+ /**
+ *
+ * @param destroyRegistrationRequest
+ * @return returns org.oasis_open.docs.wsn.br_1.DestroyResponse
+ * @throws ResourceNotDestroyedFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "DestroyRegistration")
+ @WebResult(name = "DestroyRegistrationResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/br-2",
+ partName = "DestroyRegistrationResponse")
+ public DestroyRegistrationResponse destroyRegistration(
+ @WebParam(name = "DestroyRegistration",
+ targetNamespace = "http://docs.oasis-open.org/wsn/br-2",
+ partName = "DestroyRegistrationRequest")
+ DestroyRegistration destroyRegistrationRequest) throws ResourceNotDestroyedFault, ResourceUnknownFault {
+
+ destroy();
+ return new DestroyRegistrationResponse();
+ }
+
+ public abstract void notify(NotificationMessageHolderType messageHolder);
+
+ protected void destroy() throws ResourceNotDestroyedFault {
+ try {
+ unregister();
+ } catch (EndpointRegistrationException e) {
+ ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
+ throw new ResourceNotDestroyedFault("Error unregistering endpoint", fault, e);
+ }
+ }
+
+ public void create(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault,
+ PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault,
+ TopicNotSupportedFault {
+ validatePublisher(registerPublisherRequest);
+ start();
+ }
+
+ protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault,
+ PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault,
+ TopicNotSupportedFault {
+ // Check consumer reference
+ publisherReference = registerPublisherRequest.getPublisherReference();
+ // Check topic
+ topic = registerPublisherRequest.getTopic();
+ // Check demand based
+ demand = registerPublisherRequest.isDemand() != null ? registerPublisherRequest.isDemand().booleanValue()
+ : false;
+ // Check all parameters
+ if (publisherReference == null) {
+ PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+ throw new PublisherRegistrationFailedFault("Invalid PublisherReference: null", fault);
+ }
+ if (demand && (topic == null || topic.size() == 0)) {
+ InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+ throw new InvalidTopicExpressionFault(
+ "Must specify at least one topic for demand-based publishing", fault);
+ }
+ }
+
+ protected abstract void start() throws PublisherRegistrationFailedFault;
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractPullPoint.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import javax.jws.Oneway;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
+import org.oasis_open.docs.wsn.b_2.DestroyPullPoint;
+import org.oasis_open.docs.wsn.b_2.DestroyPullPointResponse;
+import org.oasis_open.docs.wsn.b_2.GetMessages;
+import org.oasis_open.docs.wsn.b_2.GetMessagesResponse;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.UnableToDestroyPullPointFaultType;
+import org.oasis_open.docs.wsn.bw_2.NotificationConsumer;
+import org.oasis_open.docs.wsn.bw_2.PullPoint;
+import org.oasis_open.docs.wsn.bw_2.UnableToCreatePullPointFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroyPullPointFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToGetMessagesFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.PullPoint")
+public abstract class AbstractPullPoint extends AbstractEndpoint implements PullPoint, NotificationConsumer {
+
+ private final Logger logger = LoggerFactory.getLogger(AbstractPullPoint.class);
+
+ protected AbstractCreatePullPoint createPullPoint;
+
+ public AbstractPullPoint(String name) {
+ super(name);
+ }
+
+ /**
+ *
+ * @param notify
+ */
+ @WebMethod(operationName = "Notify")
+ @Oneway
+ public void notify(
+ @WebParam(name = "Notify",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-1",
+ partName = "Notify")
+ Notify notify) {
+
+ logger.debug("Notify");
+ for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
+ store(messageHolder);
+ }
+ }
+
+ /**
+ *
+ * @param getMessagesRequest
+ * @return returns org.oasis_open.docs.wsn.b_1.GetMessagesResponse
+ * @throws ResourceUnknownFault
+ * @throws UnableToGetMessagesFault
+ */
+ @WebMethod(operationName = "GetMessages")
+ @WebResult(name = "GetMessagesResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-1",
+ partName = "GetMessagesResponse")
+ public GetMessagesResponse getMessages(
+ @WebParam(name = "GetMessages",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-1",
+ partName = "GetMessagesRequest")
+ GetMessages getMessagesRequest) throws ResourceUnknownFault, UnableToGetMessagesFault {
+
+ logger.debug("GetMessages");
+ BigInteger max = getMessagesRequest.getMaximumNumber();
+ List<NotificationMessageHolderType> messages = getMessages(max != null ? max.intValue() : 0);
+ GetMessagesResponse response = new GetMessagesResponse();
+ response.getNotificationMessage().addAll(messages);
+ return response;
+ }
+
+ /**
+ *
+ * @param destroyPullPointRequest
+ * @return returns org.oasis_open.docs.wsn.b_1.DestroyResponse
+ * @throws ResourceUnknownFault
+ * @throws UnableToDestroyPullPointFault
+ */
+ @WebMethod(operationName = "DestroyPullPoint")
+ @WebResult(name = "DestroyPullPointResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "DestroyPullPointResponse")
+ public DestroyPullPointResponse destroyPullPoint(
+ @WebParam(name = "DestroyPullPoint",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "DestroyPullPointRequest")
+ DestroyPullPoint destroyPullPointRequest) throws ResourceUnknownFault, UnableToDestroyPullPointFault {
+
+ logger.debug("Destroy");
+ createPullPoint.destroyPullPoint(getAddress());
+ return new DestroyPullPointResponse();
+ }
+
+ public void create(CreatePullPoint createPullPointRequest) throws UnableToCreatePullPointFault {
+ }
+
+ protected abstract void store(NotificationMessageHolderType messageHolder);
+
+ protected abstract List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault,
+ UnableToGetMessagesFault;
+
+ protected void destroy() throws UnableToDestroyPullPointFault {
+ try {
+ unregister();
+ } catch (EndpointRegistrationException e) {
+ UnableToDestroyPullPointFaultType fault = new UnableToDestroyPullPointFaultType();
+ throw new UnableToDestroyPullPointFault("Error unregistering endpoint", fault, e);
+ }
+ }
+
+ public AbstractCreatePullPoint getCreatePullPoint() {
+ return createPullPoint;
+ }
+
+ public void setCreatePullPoint(AbstractCreatePullPoint createPullPoint) {
+ this.createPullPoint = createPullPoint;
+ }
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/AbstractSubscription.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import java.util.GregorianCalendar;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.xml.bind.JAXBElement;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeConstants;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.Duration;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.namespace.QName;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.oasis_open.docs.wsn.b_2.*;
+import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.PausableSubscriptionManager;
+import org.oasis_open.docs.wsn.bw_2.PauseFailedFault;
+import org.oasis_open.docs.wsn.bw_2.ResumeFailedFault;
+import org.oasis_open.docs.wsn.bw_2.SubscribeCreationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.TopicExpressionDialectUnknownFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.UnableToDestroySubscriptionFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnrecognizedPolicyRequestFault;
+import org.oasis_open.docs.wsn.bw_2.UnsupportedPolicyRequestFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.PausableSubscriptionManager")
+public abstract class AbstractSubscription extends AbstractEndpoint implements PausableSubscriptionManager {
+
+ public static final String WSN_URI = "http://docs.oasis-open.org/wsn/b-2";
+
+ public static final String XPATH1_URI = "http://www.w3.org/TR/1999/REC-xpath-19991116";
+
+ public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, "TopicExpression");
+
+ public static final QName QNAME_PRODUCER_PROPERTIES = new QName(WSN_URI, "ProducerProperties");
+
+ public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent");
+
+ public static final QName QNAME_USE_RAW = new QName(WSN_URI, "UseRaw");
+
+ protected DatatypeFactory datatypeFactory;
+
+ protected XMLGregorianCalendar terminationTime;
+
+ protected boolean useRaw;
+
+ protected TopicExpressionType topic;
+
+ protected QueryExpressionType contentFilter;
+
+ protected W3CEndpointReference consumerReference;
+
+ protected AbstractNotificationBroker broker;
+
+ public AbstractSubscription(String name) {
+ super(name);
+ try {
+ this.datatypeFactory = DatatypeFactory.newInstance();
+ } catch (DatatypeConfigurationException e) {
+ throw new RuntimeException("Unable to initialize subscription", e);
+ }
+ }
+
+ /**
+ *
+ * @param renewRequest
+ * @return returns org.oasis_open.docs.wsn.b_1.RenewResponse
+ * @throws UnacceptableTerminationTimeFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "Renew")
+ @WebResult(name = "RenewResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "RenewResponse")
+ public RenewResponse renew(
+ @WebParam(name = "Renew",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "RenewRequest")
+ Renew renewRequest) throws ResourceUnknownFault, UnacceptableTerminationTimeFault {
+
+ XMLGregorianCalendar time = validateTerminationTime(renewRequest.getTerminationTime());
+ renew(time);
+ RenewResponse response = new RenewResponse();
+ response.setTerminationTime(time);
+ response.setCurrentTime(getCurrentTime());
+ return response;
+ }
+
+ /**
+ *
+ * @param unsubscribeRequest
+ * @return returns org.oasis_open.docs.wsn.b_1.UnsubscribeResponse
+ * @throws UnableToDestroySubscriptionFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "Unsubscribe")
+ @WebResult(name = "UnsubscribeResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "UnsubscribeResponse")
+ public UnsubscribeResponse unsubscribe(
+ @WebParam(name = "Unsubscribe",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "UnsubscribeRequest")
+ Unsubscribe unsubscribeRequest) throws ResourceUnknownFault, UnableToDestroySubscriptionFault {
+
+ broker.unsubscribe(getAddress());
+ return new UnsubscribeResponse();
+ }
+
+ /**
+ *
+ * @param pauseSubscriptionRequest
+ * @return returns org.oasis_open.docs.wsn.b_1.PauseSubscriptionResponse
+ * @throws PauseFailedFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "PauseSubscription")
+ @WebResult(name = "PauseSubscriptionResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "PauseSubscriptionResponse")
+ public PauseSubscriptionResponse pauseSubscription(
+ @WebParam(name = "PauseSubscription",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "PauseSubscriptionRequest")
+ PauseSubscription pauseSubscriptionRequest) throws PauseFailedFault, ResourceUnknownFault {
+
+ pause();
+ return new PauseSubscriptionResponse();
+ }
+
+ /**
+ *
+ * @param resumeSubscriptionRequest
+ * @return returns org.oasis_open.docs.wsn.b_1.ResumeSubscriptionResponse
+ * @throws ResumeFailedFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "ResumeSubscription")
+ @WebResult(name = "ResumeSubscriptionResponse",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "ResumeSubscriptionResponse")
+ public ResumeSubscriptionResponse resumeSubscription(
+ @WebParam(name = "ResumeSubscription",
+ targetNamespace = "http://docs.oasis-open.org/wsn/b-2",
+ partName = "ResumeSubscriptionRequest")
+ ResumeSubscription resumeSubscriptionRequest) throws ResourceUnknownFault, ResumeFailedFault {
+
+ resume();
+ return new ResumeSubscriptionResponse();
+ }
+
+ protected XMLGregorianCalendar validateInitialTerminationTime(String value)
+ throws UnacceptableInitialTerminationTimeFault {
+ XMLGregorianCalendar tt = parseTerminationTime(value);
+ if (tt == null) {
+ UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+ throw new UnacceptableInitialTerminationTimeFault("Unable to parse initial termination time: '" + value
+ + "'", fault);
+ }
+ XMLGregorianCalendar ct = getCurrentTime();
+ int c = tt.compare(ct);
+ if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+ UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+ fault.setMinimumTime(ct);
+ throw new UnacceptableInitialTerminationTimeFault("Invalid initial termination time", fault);
+ }
+ return tt;
+ }
+
+ protected XMLGregorianCalendar validateTerminationTime(String value) throws UnacceptableTerminationTimeFault {
+ XMLGregorianCalendar tt = parseTerminationTime(value);
+ if (tt == null) {
+ UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+ throw new UnacceptableTerminationTimeFault("Unable to parse termination time: '" + value + "'", fault);
+ }
+ XMLGregorianCalendar ct = getCurrentTime();
+ int c = tt.compare(ct);
+ if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+ UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+ fault.setMinimumTime(ct);
+ throw new UnacceptableTerminationTimeFault("Invalid termination time", fault);
+ }
+ return tt;
+ }
+
+ protected XMLGregorianCalendar parseTerminationTime(String value) {
+ try {
+ Duration d = datatypeFactory.newDuration(value);
+ XMLGregorianCalendar c = getCurrentTime();
+ c.add(d);
+ return c;
+ } catch (Exception e) {
+ // Ignore
+ }
+ try {
+ Duration d = datatypeFactory.newDurationDayTime(value);
+ XMLGregorianCalendar c = getCurrentTime();
+ c.add(d);
+ return c;
+ } catch (Exception e) {
+ // Ignore
+ }
+ try {
+ Duration d = datatypeFactory.newDurationYearMonth(value);
+ XMLGregorianCalendar c = getCurrentTime();
+ c.add(d);
+ return c;
+ } catch (Exception e) {
+ // Ignore
+ }
+ try {
+ return datatypeFactory.newXMLGregorianCalendar(value);
+ } catch (Exception e) {
+ // Ignore
+ }
+ return null;
+ }
+
+ protected XMLGregorianCalendar getCurrentTime() {
+ return datatypeFactory.newXMLGregorianCalendar(new GregorianCalendar());
+ }
+
+ public XMLGregorianCalendar getTerminationTime() {
+ return terminationTime;
+ }
+
+ public void setTerminationTime(XMLGregorianCalendar terminationTime) {
+ this.terminationTime = terminationTime;
+ }
+
+ public void create(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault,
+ InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault,
+ TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault,
+ UnrecognizedPolicyRequestFault, UnsupportedPolicyRequestFault {
+ validateSubscription(subscribeRequest);
+ start();
+ }
+
+ protected abstract void start() throws SubscribeCreationFailedFault;
+
+ protected abstract void pause() throws PauseFailedFault;
+
+ protected abstract void resume() throws ResumeFailedFault;
+
+ protected abstract void renew(XMLGregorianCalendar time) throws UnacceptableTerminationTimeFault;
+
+ protected void unsubscribe() throws UnableToDestroySubscriptionFault {
+ try {
+ unregister();
+ } catch (EndpointRegistrationException e) {
+ UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
+ throw new UnableToDestroySubscriptionFault("Error unregistering endpoint", fault, e);
+ }
+ }
+
+ protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault,
+ InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault,
+ InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault,
+ TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault, UnrecognizedPolicyRequestFault,
+ UnsupportedPolicyRequestFault {
+ // Check consumer reference
+ consumerReference = subscribeRequest.getConsumerReference();
+ // Check terminationTime
+ if (subscribeRequest.getInitialTerminationTime() != null
+ && !subscribeRequest.getInitialTerminationTime().isNil()
+ && subscribeRequest.getInitialTerminationTime().getValue() != null) {
+ String strTerminationTime = subscribeRequest.getInitialTerminationTime().getValue();
+ terminationTime = validateInitialTerminationTime(strTerminationTime.trim());
+ }
+ // Check filter
+ if (subscribeRequest.getFilter() != null) {
+ for (Object f : subscribeRequest.getFilter().getAny()) {
+ JAXBElement e = null;
+ if (f instanceof JAXBElement) {
+ e = (JAXBElement) f;
+ f = e.getValue();
+ }
+ if (f instanceof TopicExpressionType) {
+ if (!e.getName().equals(QNAME_TOPIC_EXPRESSION)) {
+ InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+ throw new InvalidTopicExpressionFault("Unrecognized TopicExpression: " + e, fault);
+ }
+ topic = (TopicExpressionType) f;
+ } else if (f instanceof QueryExpressionType) {
+ if (e != null && e.getName().equals(QNAME_PRODUCER_PROPERTIES)) {
+ InvalidProducerPropertiesExpressionFaultType fault =
+ new InvalidProducerPropertiesExpressionFaultType();
+ throw new InvalidProducerPropertiesExpressionFault("ProducerProperties are not supported",
+ fault);
+ } else if (e != null && e.getName().equals(QNAME_MESSAGE_CONTENT)) {
+ if (contentFilter != null) {
+ InvalidMessageContentExpressionFaultType fault =
+ new InvalidMessageContentExpressionFaultType();
+ throw new InvalidMessageContentExpressionFault(
+ "Only one MessageContent filter can be specified", fault);
+ }
+ contentFilter = (QueryExpressionType) f;
+ // Defaults to XPath 1.0
+ if (contentFilter.getDialect() == null) {
+ contentFilter.setDialect(XPATH1_URI);
+ }
+ } else {
+ InvalidFilterFaultType fault = new InvalidFilterFaultType();
+ throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
+ }
+ } else {
+ InvalidFilterFaultType fault = new InvalidFilterFaultType();
+ throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
+ }
+ }
+ }
+ // Check policy
+ if (subscribeRequest.getSubscriptionPolicy() != null) {
+ for (Object p : subscribeRequest.getSubscriptionPolicy().getAny()) {
+ JAXBElement e = null;
+ if (p instanceof JAXBElement) {
+ e = (JAXBElement) p;
+ p = e.getValue();
+ }
+ if (p instanceof UseRaw) {
+ useRaw = true;
+ } else {
+ UnrecognizedPolicyRequestFaultType fault = new UnrecognizedPolicyRequestFaultType();
+ throw new UnrecognizedPolicyRequestFault("Unrecognized policy: " + p, fault);
+ }
+ }
+ }
+ // Check all parameters
+ if (consumerReference == null) {
+ SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+ throw new SubscribeCreationFailedFault("Invalid ConsumerReference: null", fault);
+ }
+ // TODO check we can resolve endpoint
+ if (topic == null) {
+ InvalidFilterFaultType fault = new InvalidFilterFaultType();
+ throw new InvalidFilterFault("Must specify a topic to subscribe on", fault);
+ }
+ if (contentFilter != null && !contentFilter.getDialect().equals(XPATH1_URI)) {
+ InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
+ throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect: '"
+ + contentFilter.getDialect() + "'", fault);
+ }
+ if (terminationTime != null) {
+ UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+ throw new UnacceptableInitialTerminationTimeFault("InitialTerminationTime is not supported", fault);
+ }
+ }
+
+ public AbstractNotificationBroker getBroker() {
+ return broker;
+ }
+
+ public void setBroker(AbstractNotificationBroker broker) {
+ this.broker = broker;
+ }
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointManager.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+public interface EndpointManager {
+
+ Object register(String address, Object service) throws EndpointRegistrationException;
+
+ void unregister(Object endpoint) throws EndpointRegistrationException;
+
+ W3CEndpointReference getEpr(Object endpoint);
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/EndpointRegistrationException.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn;
+
+public class EndpointRegistrationException extends Exception {
+
+ /**
+ * Generated serial version UID
+ */
+ private static final long serialVersionUID = 6365080415473176527L;
+
+ public EndpointRegistrationException() {
+ super();
+ }
+
+ public EndpointRegistrationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public EndpointRegistrationException(String message) {
+ super(message);
+ }
+
+ public EndpointRegistrationException(Throwable cause) {
+ super(cause);
+ }
+
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/Consumer.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import javax.jws.WebParam;
+import javax.jws.WebService;
+import javax.xml.ws.Endpoint;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.bw_2.NotificationConsumer;
+
+@WebService(endpointInterface = "org.oasis_open.docs.wsn.bw_2.NotificationConsumer")
+public class Consumer implements NotificationConsumer, Referencable {
+
+ public interface Callback {
+ void notify(NotificationMessageHolderType message);
+ }
+
+ private final Callback callback;
+ private final Endpoint endpoint;
+
+ public Consumer(Callback callback, String address) {
+ this.callback = callback;
+ this.endpoint = Endpoint.create(this);
+ this.endpoint.publish(address);
+ }
+
+ public void stop() {
+ this.endpoint.stop();
+ }
+
+ public W3CEndpointReference getEpr() {
+ return this.endpoint.getEndpointReference(W3CEndpointReference.class);
+ }
+
+ public void notify(@WebParam(partName = "Notify", name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-2") Notify notify) {
+ for (NotificationMessageHolderType message : notify.getNotificationMessage()) {
+ this.callback.notify(message);
+ }
+ }
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/CreatePullPoint.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.oasis_open.docs.wsn.b_2.CreatePullPointResponse;
+import org.oasis_open.docs.wsn.bw_2.UnableToCreatePullPointFault;
+
+public class CreatePullPoint implements Referencable {
+
+ private final org.oasis_open.docs.wsn.bw_2.CreatePullPoint createPullPoint;
+ private final W3CEndpointReference epr;
+
+ public CreatePullPoint(String address) {
+ this(WSNHelper.createWSA(address));
+ }
+
+ public CreatePullPoint(W3CEndpointReference epr) {
+ this.createPullPoint = WSNHelper.getPort(epr, org.oasis_open.docs.wsn.bw_2.CreatePullPoint.class);
+ this.epr = epr;
+ }
+
+ public org.oasis_open.docs.wsn.bw_2.CreatePullPoint getCreatePullPoint() {
+ return createPullPoint;
+ }
+
+ public W3CEndpointReference getEpr() {
+ return epr;
+ }
+
+ public PullPoint create() throws UnableToCreatePullPointFault {
+ org.oasis_open.docs.wsn.b_2.CreatePullPoint request = new org.oasis_open.docs.wsn.b_2.CreatePullPoint();
+ CreatePullPointResponse response = createPullPoint.createPullPoint(request);
+ return new PullPoint(response.getPullPoint());
+ }
+}
Added: cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java?rev=1179981&view=auto
==============================================================================
--- cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java (added)
+++ cxf/trunk/services/wsn/src/main/java/org/apache/cxf/wsn/client/NotificationBroker.java Fri Oct 7 09:41:10 2011
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cxf.wsn.client;
+
+import java.util.Collections;
+import java.util.List;
+import javax.xml.bind.JAXBElement;
+import javax.xml.ws.wsaddressing.W3CEndpointReference;
+
+import org.apache.cxf.wsn.AbstractSubscription;
+import org.apache.cxf.wsn.util.WSNHelper;
+import org.oasis_open.docs.wsn.b_2.FilterType;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.QueryExpressionType;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.b_2.UseRaw;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationFailedFault;
+import org.oasis_open.docs.wsn.brw_2.PublisherRegistrationRejectedFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidFilterFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidMessageContentExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidProducerPropertiesExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.InvalidTopicExpressionFault;
+import org.oasis_open.docs.wsn.bw_2.MultipleTopicsSpecifiedFault;
+import org.oasis_open.docs.wsn.bw_2.NoCurrentMessageOnTopicFault;
+import org.oasis_open.docs.wsn.bw_2.NotifyMessageNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.SubscribeCreationFailedFault;
+import org.oasis_open.docs.wsn.bw_2.TopicExpressionDialectUnknownFault;
+import org.oasis_open.docs.wsn.bw_2.TopicNotSupportedFault;
+import org.oasis_open.docs.wsn.bw_2.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.bw_2.UnrecognizedPolicyRequestFault;
+import org.oasis_open.docs.wsn.bw_2.UnsupportedPolicyRequestFault;
+import org.oasis_open.docs.wsrf.rw_2.ResourceUnknownFault;
+
+public class NotificationBroker implements Referencable {
+
+ private final org.oasis_open.docs.wsn.brw_2.NotificationBroker broker;
+ private final W3CEndpointReference epr;
+
+ public NotificationBroker(String address) {
+ this(WSNHelper.createWSA(address));
+ }
+
+ public NotificationBroker(W3CEndpointReference epr) {
+ this.broker = WSNHelper.getPort(epr, org.oasis_open.docs.wsn.brw_2.NotificationBroker.class);
+ this.epr = epr;
+ }
+
+ public org.oasis_open.docs.wsn.brw_2.NotificationBroker getBroker() {
+ return broker;
+ }
+
+ public W3CEndpointReference getEpr() {
+ return epr;
+ }
+
+ public void notify(String topic, Object msg) {
+ notify(null, topic, msg);
+ }
+
+ public void notify(Referencable publisher, String topic, Object msg) {
+ Notify notify = new Notify();
+ NotificationMessageHolderType holder = new NotificationMessageHolderType();
+ if (publisher != null) {
+ holder.setProducerReference(publisher.getEpr());
+ }
+ if (topic != null) {
+ TopicExpressionType topicExp = new TopicExpressionType();
+ topicExp.getContent().add(topic);
+ holder.setTopic(topicExp);
+ }
+ holder.setMessage(new NotificationMessageHolderType.Message());
+ holder.getMessage().setAny(msg);
+ notify.getNotificationMessage().add(holder);
+ broker.notify(notify);
+ }
+
+ public Subscription subscribe(Referencable consumer, String topic) throws TopicExpressionDialectUnknownFault, InvalidFilterFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault, InvalidMessageContentExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault, NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault {
+ return subscribe(consumer, topic, null, false);
+ }
+
+ public Subscription subscribe(Referencable consumer, String topic, String xpath) throws TopicExpressionDialectUnknownFault, InvalidFilterFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault, InvalidMessageContentExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, UnsupportedPolicyRequestFault, UnrecognizedPolicyRequestFault, NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault {
+ return subscribe(consumer, topic, xpath, false);
+ }
+
+ public Subscription subscribe(Referencable consumer, String topic,
+ String xpath, boolean raw) throws TopicNotSupportedFault, InvalidFilterFault, TopicExpressionDialectUnknownFault, UnacceptableInitialTerminationTimeFault, SubscribeCreationFailedFault, InvalidMessageContentExpressionFault, InvalidTopicExpressionFault, UnrecognizedPolicyRequestFault, UnsupportedPolicyRequestFault, ResourceUnknownFault, NotifyMessageNotSupportedFault, InvalidProducerPropertiesExpressionFault {
+
+ Subscribe subscribeRequest = new Subscribe();
+ subscribeRequest.setConsumerReference(consumer.getEpr());
+ subscribeRequest.setFilter(new FilterType());
+ if (topic != null) {
+ TopicExpressionType topicExp = new TopicExpressionType();
+ topicExp.getContent().add(topic);
+ subscribeRequest.getFilter().getAny().add(
+ new JAXBElement<TopicExpressionType>(AbstractSubscription.QNAME_TOPIC_EXPRESSION,
+ TopicExpressionType.class, topicExp));
+ }
+ if (xpath != null) {
+ QueryExpressionType xpathExp = new QueryExpressionType();
+ xpathExp.setDialect(AbstractSubscription.XPATH1_URI);
+ xpathExp.getContent().add(xpath);
+ subscribeRequest.getFilter().getAny().add(
+ new JAXBElement<QueryExpressionType>(AbstractSubscription.QNAME_MESSAGE_CONTENT,
+ QueryExpressionType.class, xpathExp));
+ }
+ if (raw) {
+ subscribeRequest.setSubscriptionPolicy(new Subscribe.SubscriptionPolicy());
+ subscribeRequest.getSubscriptionPolicy().getAny().add(new UseRaw());
+ }
+ SubscribeResponse response = broker.subscribe(subscribeRequest);
+ return new Subscription(response.getSubscriptionReference());
+ }
+
+ public List<Object> getCurrentMessage(String topic) throws TopicNotSupportedFault, TopicExpressionDialectUnknownFault, MultipleTopicsSpecifiedFault, InvalidTopicExpressionFault, ResourceUnknownFault, NoCurrentMessageOnTopicFault {
+ GetCurrentMessage getCurrentMessageRequest = new GetCurrentMessage();
+ if (topic != null) {
+ TopicExpressionType topicExp = new TopicExpressionType();
+ topicExp.getContent().add(topic);
+ getCurrentMessageRequest.setTopic(topicExp);
+ }
+ GetCurrentMessageResponse response = broker.getCurrentMessage(getCurrentMessageRequest);
+ return response.getAny();
+ }
+
+ public Registration registerPublisher(Referencable publisher,
+ String topic) throws TopicNotSupportedFault, PublisherRegistrationFailedFault, UnacceptableInitialTerminationTimeFault, InvalidTopicExpressionFault, ResourceUnknownFault, PublisherRegistrationRejectedFault {
+ return registerPublisher(publisher, topic, false);
+ }
+
+ public Registration registerPublisher(Referencable publisher,
+ String topic, boolean demand) throws TopicNotSupportedFault, PublisherRegistrationFailedFault, UnacceptableInitialTerminationTimeFault, InvalidTopicExpressionFault, ResourceUnknownFault, PublisherRegistrationRejectedFault {
+ return registerPublisher(publisher, Collections.singletonList(topic), demand);
+ }
+
+ public Registration registerPublisher(Referencable publisher,
+ List<String> topics, boolean demand) throws TopicNotSupportedFault, PublisherRegistrationFailedFault, UnacceptableInitialTerminationTimeFault, InvalidTopicExpressionFault, ResourceUnknownFault, PublisherRegistrationRejectedFault {
+
+ RegisterPublisher registerPublisherRequest = new RegisterPublisher();
+ registerPublisherRequest.setPublisherReference(publisher.getEpr());
+ if (topics != null) {
+ for (String topic : topics) {
+ TopicExpressionType topicExp = new TopicExpressionType();
+ topicExp.getContent().add(topic);
+ registerPublisherRequest.getTopic().add(topicExp);
+ }
+ }
+ registerPublisherRequest.setDemand(demand);
+ RegisterPublisherResponse response = broker.registerPublisher(registerPublisherRequest);
+ return new Registration(response.getPublisherRegistrationReference());
+ }
+
+}