You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ch...@apache.org on 2006/02/22 00:40:29 UTC
svn commit: r379627 [22/34] - in /incubator/servicemix/trunk: ./ etc/
sandbox/servicemix-wsn-1.2/src/sa/META-INF/
sandbox/servicemix-wsn-1.2/src/su/META-INF/ servicemix-assembly/
servicemix-assembly/src/main/assembly/ servicemix-assembly/src/main/relea...
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java Tue Feb 21 15:40:05 2006
@@ -1,276 +1,276 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.jws.Oneway;
-import javax.jws.WebMethod;
-import javax.jws.WebParam;
-import javax.jws.WebResult;
-import javax.jws.WebService;
-
-import org.apache.activemq.util.IdGenerator;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
-import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
-import org.apache.servicemix.wsn.jaxws.MultipleTopicsSpecifiedFault;
-import org.apache.servicemix.wsn.jaxws.NoCurrentMessageOnTopicFault;
-import org.apache.servicemix.wsn.jaxws.NotificationBroker;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
-import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
-import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
-import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
-import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
-import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
-import org.oasis_open.docs.wsn.b_2.NoCurrentMessageOnTopicFaultType;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.Notify;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
-import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
-import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
-import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse;
-import org.w3._2005._08.addressing.EndpointReferenceType;
-
-@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.NotificationBroker")
-public abstract class AbstractNotificationBroker extends AbstractEndpoint implements NotificationBroker {
-
- private static Log log = LogFactory.getLog(AbstractNotificationBroker.class);
-
- private IdGenerator idGenerator;
- private AbstractPublisher anonymousPublisher;
- private Map<String,AbstractPublisher> publishers;
- private Map<String,AbstractSubscription> subscriptions;
-
- public AbstractNotificationBroker(String name) {
- super(name);
- idGenerator = new IdGenerator();
- subscriptions = new ConcurrentHashMap<String,AbstractSubscription>();
- publishers = new ConcurrentHashMap<String, AbstractPublisher>();
- }
-
- public void init() throws Exception {
- register();
- anonymousPublisher = createPublisher("Anonymous");
- anonymousPublisher.register();
- }
-
- protected String createAddress() {
- return "http://servicemix.org/wsnotification/NotificationBroker/" + getName();
- }
-
- /**
- *
- * @param notify
- */
- @WebMethod(operationName = "Notify")
- @Oneway
- public void notify(
- @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify")
- Notify notify) {
-
- log.debug("Notify");
- handleNotify(notify);
- }
-
- protected void handleNotify(Notify notify) {
- for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
- EndpointReferenceType producerReference = messageHolder.getProducerReference();
- AbstractPublisher publisher = getPublisher(producerReference);
- if (publisher != null) {
- publisher.notify(messageHolder);
- }
- }
- }
-
- protected AbstractPublisher getPublisher(EndpointReferenceType producerReference) {
- AbstractPublisher publisher = null;
- if (producerReference != null &&
- producerReference.getAddress() != null &&
- producerReference.getAddress().getValue() != null) {
- String address = producerReference.getAddress().getValue();
- publishers.get(address);
- }
- if (publisher == null) {
- publisher = anonymousPublisher;
- }
- return publisher;
- }
-
- /**
- *
- * @param subscribeRequest
- * @return
- * returns org.oasis_open.docs.wsn.b_1.SubscribeResponse
- * @throws SubscribeCreationFailedFault
- * @throws InvalidTopicExpressionFault
- * @throws TopicNotSupportedFault
- * @throws InvalidFilterFault
- * @throws InvalidProducerPropertiesExpressionFault
- * @throws ResourceUnknownFault
- * @throws InvalidUseRawValueFault
- * @throws InvalidMessageContentExpressionFault
- * @throws TopicExpressionDialectUnknownFault
- * @throws UnacceptableInitialTerminationTimeFault
- */
- @WebMethod(operationName = "Subscribe")
- @WebResult(name = "SubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeResponse")
- public SubscribeResponse subscribe(
- @WebParam(name = "Subscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeRequest")
- Subscribe subscribeRequest)
- throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
-
- log.debug("Subscribe");
- return handleSubscribe(subscribeRequest, null);
- }
-
- public SubscribeResponse handleSubscribe(Subscribe subscribeRequest,
- EndpointManager manager) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
- AbstractSubscription subscription = null;
- boolean success = false;
- try {
- subscription = createSubcription(idGenerator.generateSanitizedId());
- subscription.setBroker(this);
- subscriptions.put(subscription.getAddress(), subscription);
- subscription.create(subscribeRequest);
- if (manager != null) {
- subscription.setManager(manager);
- }
- subscription.register();
- SubscribeResponse response = new SubscribeResponse();
- response.setSubscriptionReference(createEndpointReference(subscription.getAddress()));
- success = true;
- return response;
- } catch (EndpointRegistrationException e) {
- SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
- throw new SubscribeCreationFailedFault("Unable to register endpoint", fault, e);
- } finally {
- if (!success && subscription != null) {
- subscriptions.remove(subscription);
- try {
- subscription.unsubscribe();
- } catch (UnableToDestroySubscriptionFault e) {
- log.info("Error destroying subscription", e);
- }
- }
- }
- }
-
- public void unsubscribe(String address) throws UnableToDestroySubscriptionFault {
- AbstractSubscription subscription = (AbstractSubscription) subscriptions.remove(address);
- if (subscription != null) {
- subscription.unsubscribe();
- }
- }
-
- /**
- *
- * @param getCurrentMessageRequest
- * @return
- * returns org.oasis_open.docs.wsn.b_1.GetCurrentMessageResponse
- * @throws MultipleTopicsSpecifiedFault
- * @throws TopicNotSupportedFault
- * @throws InvalidTopicExpressionFault
- * @throws ResourceUnknownFault
- * @throws TopicExpressionDialectUnknownFault
- * @throws NoCurrentMessageOnTopicFault
- */
- @WebMethod(operationName = "GetCurrentMessage")
- @WebResult(name = "GetCurrentMessageResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageResponse")
- public GetCurrentMessageResponse getCurrentMessage(
- @WebParam(name = "GetCurrentMessage", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageRequest")
- GetCurrentMessage getCurrentMessageRequest)
- throws InvalidTopicExpressionFault, MultipleTopicsSpecifiedFault, NoCurrentMessageOnTopicFault, ResourceUnknownFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault {
-
- log.debug("GetCurrentMessage");
- NoCurrentMessageOnTopicFaultType fault = new NoCurrentMessageOnTopicFaultType();
- throw new NoCurrentMessageOnTopicFault("There is no current message on this topic.", fault);
- }
-
- /**
- *
- * @param registerPublisherRequest
- * @return
- * returns org.oasis_open.docs.wsn.br_1.RegisterPublisherResponse
- * @throws PublisherRegistrationRejectedFault
- * @throws InvalidTopicExpressionFault
- * @throws TopicNotSupportedFault
- * @throws ResourceUnknownFault
- * @throws PublisherRegistrationFailedFault
- */
- @WebMethod(operationName = "RegisterPublisher")
- @WebResult(name = "RegisterPublisherResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherResponse")
- public RegisterPublisherResponse registerPublisher(
- @WebParam(name = "RegisterPublisher", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherRequest")
- RegisterPublisher registerPublisherRequest)
- throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
-
- log.debug("RegisterPublisher");
- return handleRegisterPublisher(registerPublisherRequest, null);
- }
-
- public RegisterPublisherResponse handleRegisterPublisher(
- RegisterPublisher registerPublisherRequest,
- EndpointManager manager) throws InvalidTopicExpressionFault,
- PublisherRegistrationFailedFault,
- PublisherRegistrationRejectedFault,
- ResourceUnknownFault,
- TopicNotSupportedFault {
- AbstractPublisher publisher = null;
- boolean success = false;
- try {
- publisher = createPublisher(idGenerator.generateSanitizedId());
- publishers.put(publisher.getAddress(), publisher);
- if (manager != null) {
- publisher.setManager(manager);
- }
- publisher.register();
- publisher.create(registerPublisherRequest);
- RegisterPublisherResponse response = new RegisterPublisherResponse();
- response.setPublisherRegistrationReference(createEndpointReference(publisher.getAddress()));
- success = true;
- return response;
- } catch (EndpointRegistrationException e) {
- PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
- throw new PublisherRegistrationFailedFault("Unable to register new endpoint", fault, e);
- } finally {
- if (!success && publisher != null) {
- publishers.remove(publisher.getAddress());
- try {
- publisher.destroy();
- } catch (ResourceNotDestroyedFault e) {
- log.info("Error destroying publisher", e);
- }
- }
- }
- }
-
- protected abstract AbstractPublisher createPublisher(String name);
-
- protected abstract AbstractSubscription createSubcription(String name);
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jws.Oneway;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.apache.activemq.util.IdGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
+import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.apache.servicemix.wsn.jaxws.MultipleTopicsSpecifiedFault;
+import org.apache.servicemix.wsn.jaxws.NoCurrentMessageOnTopicFault;
+import org.apache.servicemix.wsn.jaxws.NotificationBroker;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
+import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
+import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
+import org.oasis_open.docs.wsn.b_2.NoCurrentMessageOnTopicFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse;
+import org.w3._2005._08.addressing.EndpointReferenceType;
+
+@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.NotificationBroker")
+public abstract class AbstractNotificationBroker extends AbstractEndpoint implements NotificationBroker {
+
+ private static Log log = LogFactory.getLog(AbstractNotificationBroker.class);
+
+ private IdGenerator idGenerator;
+ private AbstractPublisher anonymousPublisher;
+ private Map<String,AbstractPublisher> publishers;
+ private Map<String,AbstractSubscription> subscriptions;
+
+ public AbstractNotificationBroker(String name) {
+ super(name);
+ idGenerator = new IdGenerator();
+ subscriptions = new ConcurrentHashMap<String,AbstractSubscription>();
+ publishers = new ConcurrentHashMap<String, AbstractPublisher>();
+ }
+
+ public void init() throws Exception {
+ register();
+ anonymousPublisher = createPublisher("Anonymous");
+ anonymousPublisher.register();
+ }
+
+ protected String createAddress() {
+ return "http://servicemix.org/wsnotification/NotificationBroker/" + getName();
+ }
+
+ /**
+ *
+ * @param notify
+ */
+ @WebMethod(operationName = "Notify")
+ @Oneway
+ public void notify(
+ @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify")
+ Notify notify) {
+
+ log.debug("Notify");
+ handleNotify(notify);
+ }
+
+ protected void handleNotify(Notify notify) {
+ for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
+ EndpointReferenceType producerReference = messageHolder.getProducerReference();
+ AbstractPublisher publisher = getPublisher(producerReference);
+ if (publisher != null) {
+ publisher.notify(messageHolder);
+ }
+ }
+ }
+
+ protected AbstractPublisher getPublisher(EndpointReferenceType producerReference) {
+ AbstractPublisher publisher = null;
+ if (producerReference != null &&
+ producerReference.getAddress() != null &&
+ producerReference.getAddress().getValue() != null) {
+ String address = producerReference.getAddress().getValue();
+ publishers.get(address);
+ }
+ if (publisher == null) {
+ publisher = anonymousPublisher;
+ }
+ return publisher;
+ }
+
+ /**
+ *
+ * @param subscribeRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.b_1.SubscribeResponse
+ * @throws SubscribeCreationFailedFault
+ * @throws InvalidTopicExpressionFault
+ * @throws TopicNotSupportedFault
+ * @throws InvalidFilterFault
+ * @throws InvalidProducerPropertiesExpressionFault
+ * @throws ResourceUnknownFault
+ * @throws InvalidUseRawValueFault
+ * @throws InvalidMessageContentExpressionFault
+ * @throws TopicExpressionDialectUnknownFault
+ * @throws UnacceptableInitialTerminationTimeFault
+ */
+ @WebMethod(operationName = "Subscribe")
+ @WebResult(name = "SubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeResponse")
+ public SubscribeResponse subscribe(
+ @WebParam(name = "Subscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeRequest")
+ Subscribe subscribeRequest)
+ throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+
+ log.debug("Subscribe");
+ return handleSubscribe(subscribeRequest, null);
+ }
+
+ public SubscribeResponse handleSubscribe(Subscribe subscribeRequest,
+ EndpointManager manager) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+ AbstractSubscription subscription = null;
+ boolean success = false;
+ try {
+ subscription = createSubcription(idGenerator.generateSanitizedId());
+ subscription.setBroker(this);
+ subscriptions.put(subscription.getAddress(), subscription);
+ subscription.create(subscribeRequest);
+ if (manager != null) {
+ subscription.setManager(manager);
+ }
+ subscription.register();
+ SubscribeResponse response = new SubscribeResponse();
+ response.setSubscriptionReference(createEndpointReference(subscription.getAddress()));
+ success = true;
+ return response;
+ } catch (EndpointRegistrationException e) {
+ SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+ throw new SubscribeCreationFailedFault("Unable to register endpoint", fault, e);
+ } finally {
+ if (!success && subscription != null) {
+ subscriptions.remove(subscription);
+ try {
+ subscription.unsubscribe();
+ } catch (UnableToDestroySubscriptionFault e) {
+ log.info("Error destroying subscription", e);
+ }
+ }
+ }
+ }
+
+ public void unsubscribe(String address) throws UnableToDestroySubscriptionFault {
+ AbstractSubscription subscription = (AbstractSubscription) subscriptions.remove(address);
+ if (subscription != null) {
+ subscription.unsubscribe();
+ }
+ }
+
+ /**
+ *
+ * @param getCurrentMessageRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.b_1.GetCurrentMessageResponse
+ * @throws MultipleTopicsSpecifiedFault
+ * @throws TopicNotSupportedFault
+ * @throws InvalidTopicExpressionFault
+ * @throws ResourceUnknownFault
+ * @throws TopicExpressionDialectUnknownFault
+ * @throws NoCurrentMessageOnTopicFault
+ */
+ @WebMethod(operationName = "GetCurrentMessage")
+ @WebResult(name = "GetCurrentMessageResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageResponse")
+ public GetCurrentMessageResponse getCurrentMessage(
+ @WebParam(name = "GetCurrentMessage", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageRequest")
+ GetCurrentMessage getCurrentMessageRequest)
+ throws InvalidTopicExpressionFault, MultipleTopicsSpecifiedFault, NoCurrentMessageOnTopicFault, ResourceUnknownFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault {
+
+ log.debug("GetCurrentMessage");
+ NoCurrentMessageOnTopicFaultType fault = new NoCurrentMessageOnTopicFaultType();
+ throw new NoCurrentMessageOnTopicFault("There is no current message on this topic.", fault);
+ }
+
+ /**
+ *
+ * @param registerPublisherRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.br_1.RegisterPublisherResponse
+ * @throws PublisherRegistrationRejectedFault
+ * @throws InvalidTopicExpressionFault
+ * @throws TopicNotSupportedFault
+ * @throws ResourceUnknownFault
+ * @throws PublisherRegistrationFailedFault
+ */
+ @WebMethod(operationName = "RegisterPublisher")
+ @WebResult(name = "RegisterPublisherResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherResponse")
+ public RegisterPublisherResponse registerPublisher(
+ @WebParam(name = "RegisterPublisher", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherRequest")
+ RegisterPublisher registerPublisherRequest)
+ throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+
+ log.debug("RegisterPublisher");
+ return handleRegisterPublisher(registerPublisherRequest, null);
+ }
+
+ public RegisterPublisherResponse handleRegisterPublisher(
+ RegisterPublisher registerPublisherRequest,
+ EndpointManager manager) throws InvalidTopicExpressionFault,
+ PublisherRegistrationFailedFault,
+ PublisherRegistrationRejectedFault,
+ ResourceUnknownFault,
+ TopicNotSupportedFault {
+ AbstractPublisher publisher = null;
+ boolean success = false;
+ try {
+ publisher = createPublisher(idGenerator.generateSanitizedId());
+ publishers.put(publisher.getAddress(), publisher);
+ if (manager != null) {
+ publisher.setManager(manager);
+ }
+ publisher.register();
+ publisher.create(registerPublisherRequest);
+ RegisterPublisherResponse response = new RegisterPublisherResponse();
+ response.setPublisherRegistrationReference(createEndpointReference(publisher.getAddress()));
+ success = true;
+ return response;
+ } catch (EndpointRegistrationException e) {
+ PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+ throw new PublisherRegistrationFailedFault("Unable to register new endpoint", fault, e);
+ } finally {
+ if (!success && publisher != null) {
+ publishers.remove(publisher.getAddress());
+ try {
+ publisher.destroy();
+ } catch (ResourceNotDestroyedFault e) {
+ log.info("Error destroying publisher", e);
+ }
+ }
+ }
+ }
+
+ protected abstract AbstractPublisher createPublisher(String name);
+
+ protected abstract AbstractSubscription createSubcription(String name);
+
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java Tue Feb 21 15:40:05 2006
@@ -1,114 +1,114 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.util.List;
-
-import javax.jws.WebMethod;
-import javax.jws.WebParam;
-import javax.jws.WebResult;
-import javax.jws.WebService;
-
-import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
-import org.oasis_open.docs.wsn.br_2.DestroyRegistration;
-import org.oasis_open.docs.wsn.br_2.DestroyRegistrationResponse;
-import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
-import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
-import org.oasis_open.docs.wsn.br_2.ResourceNotDestroyedFaultType;
-import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationManager;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
-import org.w3._2005._08.addressing.EndpointReferenceType;
-
-@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.PublisherRegistrationManager")
-public abstract class AbstractPublisher extends AbstractEndpoint
- implements PublisherRegistrationManager {
-
- protected EndpointReferenceType publisherReference;
- protected boolean demand;
- protected List<TopicExpressionType> topic;
-
- public AbstractPublisher(String name) {
- super(name);
- }
-
- /**
- *
- * @param destroyRequest
- * @return
- * returns org.oasis_open.docs.wsn.br_1.DestroyResponse
- * @throws ResourceNotDestroyedFault
- * @throws ResourceUnknownFault
- */
- @WebMethod(operationName = "DestroyRegistration")
- @WebResult(name = "DestroyRegistrationResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-2", partName = "DestroyRegistrationResponse")
- public DestroyRegistrationResponse destroyRegistration(
- @WebParam(name = "DestroyRegistration", targetNamespace = "http://docs.oasis-open.org/wsn/br-2", partName = "DestroyRegistrationRequest")
- DestroyRegistration destroyRegistrationRequest)
- throws ResourceNotDestroyedFault, ResourceUnknownFault {
-
- destroy();
- return new DestroyRegistrationResponse();
- }
-
- public abstract void notify(NotificationMessageHolderType messageHolder);
-
- protected void destroy() throws ResourceNotDestroyedFault {
- try {
- unregister();
- } catch (EndpointRegistrationException e) {
- ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
- throw new ResourceNotDestroyedFault("Error unregistering endpoint", fault, e);
- }
- }
-
- protected String createAddress() {
- return "http://servicemix.org/wsnotification/Publisher/" + getName();
- }
-
- public void create(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
- validatePublisher(registerPublisherRequest);
- start();
- }
-
- protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
- // Check consumer reference
- publisherReference = registerPublisherRequest.getPublisherReference();
- // Check topic
- topic = registerPublisherRequest.getTopic();
- // Check demand based
- demand = registerPublisherRequest.isDemand() != null ? registerPublisherRequest.isDemand().booleanValue() : false;
- // Check all parameters
- if (publisherReference == null) {
- PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
- throw new PublisherRegistrationFailedFault("Invalid PublisherReference: null", fault);
- }
- if (demand) {
- if (topic == null || topic.size() == 0) {
- InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
- throw new InvalidTopicExpressionFault("Must specify at least one topic for demand-based publishing", fault);
- }
- }
- }
-
- protected abstract void start() throws PublisherRegistrationFailedFault;
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.util.List;
+
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.br_2.DestroyRegistration;
+import org.oasis_open.docs.wsn.br_2.DestroyRegistrationResponse;
+import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.ResourceNotDestroyedFaultType;
+import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationManager;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.w3._2005._08.addressing.EndpointReferenceType;
+
+@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.PublisherRegistrationManager")
+public abstract class AbstractPublisher extends AbstractEndpoint
+ implements PublisherRegistrationManager {
+
+ protected EndpointReferenceType publisherReference;
+ protected boolean demand;
+ protected List<TopicExpressionType> topic;
+
+ public AbstractPublisher(String name) {
+ super(name);
+ }
+
+ /**
+ *
+ * @param destroyRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.br_1.DestroyResponse
+ * @throws ResourceNotDestroyedFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "DestroyRegistration")
+ @WebResult(name = "DestroyRegistrationResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-2", partName = "DestroyRegistrationResponse")
+ public DestroyRegistrationResponse destroyRegistration(
+ @WebParam(name = "DestroyRegistration", targetNamespace = "http://docs.oasis-open.org/wsn/br-2", partName = "DestroyRegistrationRequest")
+ DestroyRegistration destroyRegistrationRequest)
+ throws ResourceNotDestroyedFault, ResourceUnknownFault {
+
+ destroy();
+ return new DestroyRegistrationResponse();
+ }
+
+ public abstract void notify(NotificationMessageHolderType messageHolder);
+
+ protected void destroy() throws ResourceNotDestroyedFault {
+ try {
+ unregister();
+ } catch (EndpointRegistrationException e) {
+ ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
+ throw new ResourceNotDestroyedFault("Error unregistering endpoint", fault, e);
+ }
+ }
+
+ protected String createAddress() {
+ return "http://servicemix.org/wsnotification/Publisher/" + getName();
+ }
+
+ public void create(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+ validatePublisher(registerPublisherRequest);
+ start();
+ }
+
+ protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+ // Check consumer reference
+ publisherReference = registerPublisherRequest.getPublisherReference();
+ // Check topic
+ topic = registerPublisherRequest.getTopic();
+ // Check demand based
+ demand = registerPublisherRequest.isDemand() != null ? registerPublisherRequest.isDemand().booleanValue() : false;
+ // Check all parameters
+ if (publisherReference == null) {
+ PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+ throw new PublisherRegistrationFailedFault("Invalid PublisherReference: null", fault);
+ }
+ if (demand) {
+ if (topic == null || topic.size() == 0) {
+ InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+ throw new InvalidTopicExpressionFault("Must specify at least one topic for demand-based publishing", fault);
+ }
+ }
+ }
+
+ protected abstract void start() throws PublisherRegistrationFailedFault;
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java Tue Feb 21 15:40:05 2006
@@ -1,139 +1,139 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.math.BigInteger;
-import java.util.List;
-
-import javax.jws.Oneway;
-import javax.jws.WebMethod;
-import javax.jws.WebParam;
-import javax.jws.WebResult;
-import javax.jws.WebService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
-import org.oasis_open.docs.wsn.b_2.DestroyPullPoint;
-import org.oasis_open.docs.wsn.b_2.DestroyPullPointResponse;
-import org.oasis_open.docs.wsn.b_2.GetMessages;
-import org.oasis_open.docs.wsn.b_2.GetMessagesResponse;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.Notify;
-import org.oasis_open.docs.wsn.b_2.UnableToDestroyPullPointFaultType;
-import org.apache.servicemix.wsn.jaxws.NotificationConsumer;
-import org.apache.servicemix.wsn.jaxws.PullPoint;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.apache.servicemix.wsn.jaxws.UnableToCreatePullPointFault;
-import org.apache.servicemix.wsn.jaxws.UnableToDestroyPullPointFault;
-
-@WebService(endpointInterface = "org.apache.servicemix.wsn.PullPointConsumer")
-public abstract class AbstractPullPoint extends AbstractEndpoint
- implements PullPoint, NotificationConsumer {
-
- private static Log log = LogFactory.getLog(AbstractPullPoint.class);
-
- protected AbstractCreatePullPoint createPullPoint;
-
- public AbstractPullPoint(String name) {
- super(name);
- }
-
- /**
- *
- * @param notify
- */
- @WebMethod(operationName = "Notify")
- @Oneway
- public void notify(
- @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify")
- Notify notify) {
-
- log.debug("Notify");
- for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
- store(messageHolder);
- }
- }
-
- /**
- *
- * @param getMessagesRequest
- * @return
- * returns org.oasis_open.docs.wsn.b_1.GetMessagesResponse
- * @throws ResourceUnknownFault
- */
- @WebMethod(operationName = "GetMessages")
- @WebResult(name = "GetMessagesResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetMessagesResponse")
- public GetMessagesResponse getMessages(
- @WebParam(name = "GetMessages", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetMessagesRequest")
- GetMessages getMessagesRequest)
- throws ResourceUnknownFault {
-
- log.debug("GetMessages");
- BigInteger max = getMessagesRequest.getMaximumNumber();
- List<NotificationMessageHolderType> messages = getMessages(max != null ? max.intValue() : 0);
- GetMessagesResponse response = new GetMessagesResponse();
- response.getNotificationMessage().addAll(messages);
- return response;
- }
-
- /**
- *
- * @param destroyRequest
- * @return
- * returns org.oasis_open.docs.wsn.b_1.DestroyResponse
- * @throws UnableToDestroyPullPoint
- */
- @WebMethod(operationName = "DestroyPullPoint")
- @WebResult(name = "DestroyPullPointResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "DestroyPullPointResponse")
- public DestroyPullPointResponse destroyPullPoint(
- @WebParam(name = "DestroyPullPoint", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "DestroyPullPointRequest")
- DestroyPullPoint destroyPullPointRequest)
- throws UnableToDestroyPullPointFault {
-
- log.debug("Destroy");
- createPullPoint.destroyPullPoint(getAddress());
- return new DestroyPullPointResponse();
- }
-
- public void create(CreatePullPoint createPullPointRequest) throws UnableToCreatePullPointFault {
- }
-
- protected abstract void store(NotificationMessageHolderType messageHolder);
-
- protected abstract List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault;
-
- protected void destroy() throws UnableToDestroyPullPointFault {
- try {
- unregister();
- } catch (EndpointRegistrationException e) {
- UnableToDestroyPullPointFaultType fault = new UnableToDestroyPullPointFaultType();
- throw new UnableToDestroyPullPointFault("Error unregistering endpoint", fault, e);
- }
- }
-
- protected String createAddress() {
- return "http://servicemix.org/wsnotification/PullPoint/" + getName();
- }
-
- public AbstractCreatePullPoint getCreatePullPoint() {
- return createPullPoint;
- }
-
- public void setCreatePullPoint(AbstractCreatePullPoint createPullPoint) {
- this.createPullPoint = createPullPoint;
- }
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import javax.jws.Oneway;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
+import org.oasis_open.docs.wsn.b_2.DestroyPullPoint;
+import org.oasis_open.docs.wsn.b_2.DestroyPullPointResponse;
+import org.oasis_open.docs.wsn.b_2.GetMessages;
+import org.oasis_open.docs.wsn.b_2.GetMessagesResponse;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.UnableToDestroyPullPointFaultType;
+import org.apache.servicemix.wsn.jaxws.NotificationConsumer;
+import org.apache.servicemix.wsn.jaxws.PullPoint;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.apache.servicemix.wsn.jaxws.UnableToCreatePullPointFault;
+import org.apache.servicemix.wsn.jaxws.UnableToDestroyPullPointFault;
+
+@WebService(endpointInterface = "org.apache.servicemix.wsn.PullPointConsumer")
+public abstract class AbstractPullPoint extends AbstractEndpoint
+ implements PullPoint, NotificationConsumer {
+
+ private static Log log = LogFactory.getLog(AbstractPullPoint.class);
+
+ protected AbstractCreatePullPoint createPullPoint;
+
+ public AbstractPullPoint(String name) {
+ super(name);
+ }
+
+ /**
+ *
+ * @param notify
+ */
+ @WebMethod(operationName = "Notify")
+ @Oneway
+ public void notify(
+ @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify")
+ Notify notify) {
+
+ log.debug("Notify");
+ for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
+ store(messageHolder);
+ }
+ }
+
+ /**
+ *
+ * @param getMessagesRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.b_1.GetMessagesResponse
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "GetMessages")
+ @WebResult(name = "GetMessagesResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetMessagesResponse")
+ public GetMessagesResponse getMessages(
+ @WebParam(name = "GetMessages", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetMessagesRequest")
+ GetMessages getMessagesRequest)
+ throws ResourceUnknownFault {
+
+ log.debug("GetMessages");
+ BigInteger max = getMessagesRequest.getMaximumNumber();
+ List<NotificationMessageHolderType> messages = getMessages(max != null ? max.intValue() : 0);
+ GetMessagesResponse response = new GetMessagesResponse();
+ response.getNotificationMessage().addAll(messages);
+ return response;
+ }
+
+ /**
+ *
+ * @param destroyRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.b_1.DestroyResponse
+ * @throws UnableToDestroyPullPoint
+ */
+ @WebMethod(operationName = "DestroyPullPoint")
+ @WebResult(name = "DestroyPullPointResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "DestroyPullPointResponse")
+ public DestroyPullPointResponse destroyPullPoint(
+ @WebParam(name = "DestroyPullPoint", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "DestroyPullPointRequest")
+ DestroyPullPoint destroyPullPointRequest)
+ throws UnableToDestroyPullPointFault {
+
+ log.debug("Destroy");
+ createPullPoint.destroyPullPoint(getAddress());
+ return new DestroyPullPointResponse();
+ }
+
+ public void create(CreatePullPoint createPullPointRequest) throws UnableToCreatePullPointFault {
+ }
+
+ protected abstract void store(NotificationMessageHolderType messageHolder);
+
+ protected abstract List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault;
+
+ protected void destroy() throws UnableToDestroyPullPointFault {
+ try {
+ unregister();
+ } catch (EndpointRegistrationException e) {
+ UnableToDestroyPullPointFaultType fault = new UnableToDestroyPullPointFaultType();
+ throw new UnableToDestroyPullPointFault("Error unregistering endpoint", fault, e);
+ }
+ }
+
+ protected String createAddress() {
+ return "http://servicemix.org/wsnotification/PullPoint/" + getName();
+ }
+
+ public AbstractCreatePullPoint getCreatePullPoint() {
+ return createPullPoint;
+ }
+
+ public void setCreatePullPoint(AbstractCreatePullPoint createPullPoint) {
+ this.createPullPoint = createPullPoint;
+ }
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java Tue Feb 21 15:40:05 2006
@@ -1,372 +1,372 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.util.GregorianCalendar;
-
-import javax.jws.WebMethod;
-import javax.jws.WebParam;
-import javax.jws.WebResult;
-import javax.jws.WebService;
-import javax.xml.bind.JAXBElement;
-import javax.xml.datatype.DatatypeConfigurationException;
-import javax.xml.datatype.DatatypeConstants;
-import javax.xml.datatype.DatatypeFactory;
-import javax.xml.datatype.Duration;
-import javax.xml.datatype.XMLGregorianCalendar;
-import javax.xml.namespace.QName;
-
-import org.oasis_open.docs.wsn.b_2.InvalidFilterFaultType;
-import org.oasis_open.docs.wsn.b_2.InvalidMessageContentExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.InvalidProducerPropertiesExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.PauseSubscription;
-import org.oasis_open.docs.wsn.b_2.PauseSubscriptionResponse;
-import org.oasis_open.docs.wsn.b_2.QueryExpressionType;
-import org.oasis_open.docs.wsn.b_2.Renew;
-import org.oasis_open.docs.wsn.b_2.RenewResponse;
-import org.oasis_open.docs.wsn.b_2.ResumeSubscription;
-import org.oasis_open.docs.wsn.b_2.ResumeSubscriptionResponse;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
-import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
-import org.oasis_open.docs.wsn.b_2.UnacceptableInitialTerminationTimeFaultType;
-import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
-import org.oasis_open.docs.wsn.b_2.Unsubscribe;
-import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
-import org.oasis_open.docs.wsn.b_2.UseRaw;
-import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
-import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
-import org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager;
-import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
-import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
-import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
-import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
-import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
-import org.w3._2005._08.addressing.EndpointReferenceType;
-
-@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager")
-public abstract class AbstractSubscription extends AbstractEndpoint
- implements PausableSubscriptionManager {
-
- public static final String WSN_URI = "http://docs.oasis-open.org/wsn/b-2";
- public static final String XPATH1_URI = "http://www.w3.org/TR/1999/REC-xpath-19991116";
- public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, "TopicExpression");
- public static final QName QNAME_PRODUCER_PROPERTIES = new QName(WSN_URI, "ProducerProperties");
- public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent");
- public static final QName QNAME_USE_RAW = new QName(WSN_URI, "UseRaw");
-
- protected DatatypeFactory datatypeFactory;
- protected XMLGregorianCalendar terminationTime;
- protected boolean useRaw;
- protected TopicExpressionType topic;
- protected QueryExpressionType contentFilter;
- protected EndpointReferenceType consumerReference;
- protected AbstractNotificationBroker broker;
-
- public AbstractSubscription(String name) {
- super(name);
- try {
- this.datatypeFactory = DatatypeFactory.newInstance();
- } catch (DatatypeConfigurationException e) {
- throw new RuntimeException("Unable to initialize subscription", e);
- }
- }
-
- /**
- *
- * @param renewRequest
- * @return
- * returns org.oasis_open.docs.wsn.b_1.RenewResponse
- * @throws UnacceptableTerminationTimeFault
- * @throws ResourceUnknownFault
- */
- @WebMethod(operationName = "Renew")
- @WebResult(name = "RenewResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewResponse")
- public RenewResponse renew(
- @WebParam(name = "Renew", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewRequest")
- Renew renewRequest)
- throws ResourceUnknownFault, UnacceptableTerminationTimeFault {
-
- XMLGregorianCalendar terminationTime = validateTerminationTime(renewRequest.getTerminationTime());
- renew(terminationTime);
- RenewResponse response = new RenewResponse();
- response.setTerminationTime(terminationTime);
- response.setCurrentTime(getCurrentTime());
- return response;
- }
-
- /**
- *
- * @param unsubscribeRequest
- * @return
- * returns org.oasis_open.docs.wsn.b_1.UnsubscribeResponse
- * @throws UnableToDestroySubscriptionFault
- * @throws ResourceUnknownFault
- */
- @WebMethod(operationName = "Unsubscribe")
- @WebResult(name = "UnsubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeResponse")
- public UnsubscribeResponse unsubscribe(
- @WebParam(name = "Unsubscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeRequest")
- Unsubscribe unsubscribeRequest)
- throws ResourceUnknownFault, UnableToDestroySubscriptionFault {
-
- broker.unsubscribe(getAddress());
- return new UnsubscribeResponse();
- }
-
- /**
- *
- * @param pauseSubscriptionRequest
- * @return
- * returns org.oasis_open.docs.wsn.b_1.PauseSubscriptionResponse
- * @throws PauseFailedFault
- * @throws ResourceUnknownFault
- */
- @WebMethod(operationName = "PauseSubscription")
- @WebResult(name = "PauseSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionResponse")
- public PauseSubscriptionResponse pauseSubscription(
- @WebParam(name = "PauseSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionRequest")
- PauseSubscription pauseSubscriptionRequest)
- throws PauseFailedFault, ResourceUnknownFault {
-
- pause();
- return new PauseSubscriptionResponse();
- }
-
- /**
- *
- * @param resumeSubscriptionRequest
- * @return
- * returns org.oasis_open.docs.wsn.b_1.ResumeSubscriptionResponse
- * @throws ResumeFailedFault
- * @throws ResourceUnknownFault
- */
- @WebMethod(operationName = "ResumeSubscription")
- @WebResult(name = "ResumeSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionResponse")
- public ResumeSubscriptionResponse resumeSubscription(
- @WebParam(name = "ResumeSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionRequest")
- ResumeSubscription resumeSubscriptionRequest)
- throws ResourceUnknownFault, ResumeFailedFault {
-
- resume();
- return new ResumeSubscriptionResponse();
- }
-
- protected XMLGregorianCalendar validateInitialTerminationTime(String value) throws UnacceptableInitialTerminationTimeFault {
- XMLGregorianCalendar tt = parseTerminationTime(value);
- if (tt == null) {
- UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
- throw new UnacceptableInitialTerminationTimeFault(
- "Unable to parse initial termination time: '" + value + "'",
- fault);
- }
- XMLGregorianCalendar ct = getCurrentTime();
- int c = tt.compare(ct);
- if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
- UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
- fault.setMinimumTime(ct);
- throw new UnacceptableInitialTerminationTimeFault(
- "Invalid initial termination time",
- fault);
- }
- return tt;
- }
-
- protected XMLGregorianCalendar validateTerminationTime(String value) throws UnacceptableTerminationTimeFault {
- XMLGregorianCalendar tt = parseTerminationTime(value);
- if (tt == null) {
- UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
- throw new UnacceptableTerminationTimeFault(
- "Unable to parse termination time: '" + value + "'",
- fault);
- }
- XMLGregorianCalendar ct = getCurrentTime();
- int c = tt.compare(ct);
- if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
- UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
- fault.setMinimumTime(ct);
- throw new UnacceptableTerminationTimeFault(
- "Invalid termination time",
- fault);
- }
- return tt;
- }
-
- protected XMLGregorianCalendar parseTerminationTime(String value) {
- try {
- Duration d = datatypeFactory.newDuration(value);
- XMLGregorianCalendar c = getCurrentTime();
- c.add(d);
- return c;
- } catch (Exception e) { }
- try {
- Duration d = datatypeFactory.newDurationDayTime(value);
- XMLGregorianCalendar c = getCurrentTime();
- c.add(d);
- return c;
- } catch (Exception e) { }
- try {
- Duration d = datatypeFactory.newDurationYearMonth(value);
- XMLGregorianCalendar c = getCurrentTime();
- c.add(d);
- return c;
- } catch (Exception e) { }
- try {
- return datatypeFactory.newXMLGregorianCalendar(value);
- } catch (Exception e) { }
- return null;
- }
-
- protected XMLGregorianCalendar getCurrentTime() {
- return datatypeFactory.newXMLGregorianCalendar(new GregorianCalendar());
- }
-
- public XMLGregorianCalendar getTerminationTime() {
- return terminationTime;
- }
-
- public void setTerminationTime(XMLGregorianCalendar terminationTime) {
- this.terminationTime = terminationTime;
- }
-
- public void create(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
- validateSubscription(subscribeRequest);
- start();
- }
-
- protected abstract void start() throws SubscribeCreationFailedFault;
-
- protected abstract void pause() throws PauseFailedFault;
-
- protected abstract void resume() throws ResumeFailedFault;
-
- protected abstract void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault;
-
- protected void unsubscribe() throws UnableToDestroySubscriptionFault {
- try {
- unregister();
- } catch (EndpointRegistrationException e) {
- UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
- throw new UnableToDestroySubscriptionFault("Error unregistering endpoint", fault, e);
- }
- }
-
- protected String createAddress() {
- return "http://servicemix.org/wsnotification/Subscription/" + getName();
- }
-
- protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
- // Check consumer reference
- consumerReference = subscribeRequest.getConsumerReference();
- // Check terminationTime
- if (subscribeRequest.getInitialTerminationTime() != null &&
- subscribeRequest.getInitialTerminationTime().isNil() == false &&
- subscribeRequest.getInitialTerminationTime().getValue() != null) {
- String strTerminationTime = subscribeRequest.getInitialTerminationTime().getValue();
- terminationTime = validateInitialTerminationTime(strTerminationTime.trim());
- }
- // Check filter
- if (subscribeRequest.getFilter() != null) {
- for (Object f : subscribeRequest.getFilter().getAny()) {
- JAXBElement e = null;
- if (f instanceof JAXBElement) {
- e = (JAXBElement) f;
- f = e.getValue();
- }
- if (f instanceof TopicExpressionType) {
- if (!e.getName().equals(QNAME_TOPIC_EXPRESSION)) {
- InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
- throw new InvalidTopicExpressionFault("Unrecognized TopicExpression: " + e, fault);
- }
- topic = (TopicExpressionType) f;
- } else if (f instanceof QueryExpressionType) {
- if (e != null && e.getName().equals(QNAME_PRODUCER_PROPERTIES)) {
- InvalidProducerPropertiesExpressionFaultType fault = new InvalidProducerPropertiesExpressionFaultType();
- throw new InvalidProducerPropertiesExpressionFault("ProducerProperties are not supported", fault);
- } else if (e != null && e.getName().equals(QNAME_MESSAGE_CONTENT)) {
- if (contentFilter != null) {
- InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
- throw new InvalidMessageContentExpressionFault("Only one MessageContent filter can be specified", fault);
- }
- contentFilter = (QueryExpressionType) f;
- // Defaults to XPath 1.0
- if (contentFilter.getDialect() == null) {
- contentFilter.setDialect(XPATH1_URI);
- }
- } else {
- InvalidFilterFaultType fault = new InvalidFilterFaultType();
- throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
- }
- } else {
- InvalidFilterFaultType fault = new InvalidFilterFaultType();
- throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
- }
- }
- }
- // Check policy
- if (subscribeRequest.getSubscriptionPolicy() != null) {
- for (Object p : subscribeRequest.getSubscriptionPolicy().getAny()) {
- JAXBElement e = null;
- if (p instanceof JAXBElement) {
- e = (JAXBElement) p;
- p = e.getValue();
- }
- if (p instanceof UseRaw) {
- useRaw = true;
- } else {
- InvalidFilterFaultType fault = new InvalidFilterFaultType();
- throw new InvalidFilterFault("Unrecognized policy: " + p, fault);
- }
- }
- }
- // Check all parameters
- if (consumerReference == null) {
- SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
- throw new SubscribeCreationFailedFault("Invalid ConsumerReference: null", fault);
- }
- // TODO check we can resolve endpoint
- if (topic == null) {
- InvalidFilterFaultType fault = new InvalidFilterFaultType();
- throw new InvalidFilterFault("Must specify a topic to subscribe on", fault);
- }
- if (contentFilter != null && !contentFilter.getDialect().equals(XPATH1_URI)) {
- InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
- throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect: '" + contentFilter.getDialect() + "'", fault);
- }
- if (terminationTime != null) {
- UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
- throw new UnacceptableInitialTerminationTimeFault(
- "InitialTerminationTime is not supported",
- fault);
- }
- }
-
- public AbstractNotificationBroker getBroker() {
- return broker;
- }
-
- public void setBroker(AbstractNotificationBroker broker) {
- this.broker = broker;
- }
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.util.GregorianCalendar;
+
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.xml.bind.JAXBElement;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeConstants;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.Duration;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.namespace.QName;
+
+import org.oasis_open.docs.wsn.b_2.InvalidFilterFaultType;
+import org.oasis_open.docs.wsn.b_2.InvalidMessageContentExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.InvalidProducerPropertiesExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.PauseSubscription;
+import org.oasis_open.docs.wsn.b_2.PauseSubscriptionResponse;
+import org.oasis_open.docs.wsn.b_2.QueryExpressionType;
+import org.oasis_open.docs.wsn.b_2.Renew;
+import org.oasis_open.docs.wsn.b_2.RenewResponse;
+import org.oasis_open.docs.wsn.b_2.ResumeSubscription;
+import org.oasis_open.docs.wsn.b_2.ResumeSubscriptionResponse;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
+import org.oasis_open.docs.wsn.b_2.UnacceptableInitialTerminationTimeFaultType;
+import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
+import org.oasis_open.docs.wsn.b_2.Unsubscribe;
+import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
+import org.oasis_open.docs.wsn.b_2.UseRaw;
+import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
+import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager;
+import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
+import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
+import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
+import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
+import org.w3._2005._08.addressing.EndpointReferenceType;
+
+@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager")
+public abstract class AbstractSubscription extends AbstractEndpoint
+ implements PausableSubscriptionManager {
+
+ public static final String WSN_URI = "http://docs.oasis-open.org/wsn/b-2";
+ public static final String XPATH1_URI = "http://www.w3.org/TR/1999/REC-xpath-19991116";
+ public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, "TopicExpression");
+ public static final QName QNAME_PRODUCER_PROPERTIES = new QName(WSN_URI, "ProducerProperties");
+ public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent");
+ public static final QName QNAME_USE_RAW = new QName(WSN_URI, "UseRaw");
+
+ protected DatatypeFactory datatypeFactory;
+ protected XMLGregorianCalendar terminationTime;
+ protected boolean useRaw;
+ protected TopicExpressionType topic;
+ protected QueryExpressionType contentFilter;
+ protected EndpointReferenceType consumerReference;
+ protected AbstractNotificationBroker broker;
+
+ public AbstractSubscription(String name) {
+ super(name);
+ try {
+ this.datatypeFactory = DatatypeFactory.newInstance();
+ } catch (DatatypeConfigurationException e) {
+ throw new RuntimeException("Unable to initialize subscription", e);
+ }
+ }
+
+ /**
+ *
+ * @param renewRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.b_1.RenewResponse
+ * @throws UnacceptableTerminationTimeFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "Renew")
+ @WebResult(name = "RenewResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewResponse")
+ public RenewResponse renew(
+ @WebParam(name = "Renew", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewRequest")
+ Renew renewRequest)
+ throws ResourceUnknownFault, UnacceptableTerminationTimeFault {
+
+ XMLGregorianCalendar terminationTime = validateTerminationTime(renewRequest.getTerminationTime());
+ renew(terminationTime);
+ RenewResponse response = new RenewResponse();
+ response.setTerminationTime(terminationTime);
+ response.setCurrentTime(getCurrentTime());
+ return response;
+ }
+
+ /**
+ *
+ * @param unsubscribeRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.b_1.UnsubscribeResponse
+ * @throws UnableToDestroySubscriptionFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "Unsubscribe")
+ @WebResult(name = "UnsubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeResponse")
+ public UnsubscribeResponse unsubscribe(
+ @WebParam(name = "Unsubscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeRequest")
+ Unsubscribe unsubscribeRequest)
+ throws ResourceUnknownFault, UnableToDestroySubscriptionFault {
+
+ broker.unsubscribe(getAddress());
+ return new UnsubscribeResponse();
+ }
+
+ /**
+ *
+ * @param pauseSubscriptionRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.b_1.PauseSubscriptionResponse
+ * @throws PauseFailedFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "PauseSubscription")
+ @WebResult(name = "PauseSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionResponse")
+ public PauseSubscriptionResponse pauseSubscription(
+ @WebParam(name = "PauseSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionRequest")
+ PauseSubscription pauseSubscriptionRequest)
+ throws PauseFailedFault, ResourceUnknownFault {
+
+ pause();
+ return new PauseSubscriptionResponse();
+ }
+
+ /**
+ *
+ * @param resumeSubscriptionRequest
+ * @return
+ * returns org.oasis_open.docs.wsn.b_1.ResumeSubscriptionResponse
+ * @throws ResumeFailedFault
+ * @throws ResourceUnknownFault
+ */
+ @WebMethod(operationName = "ResumeSubscription")
+ @WebResult(name = "ResumeSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionResponse")
+ public ResumeSubscriptionResponse resumeSubscription(
+ @WebParam(name = "ResumeSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionRequest")
+ ResumeSubscription resumeSubscriptionRequest)
+ throws ResourceUnknownFault, ResumeFailedFault {
+
+ resume();
+ return new ResumeSubscriptionResponse();
+ }
+
+ protected XMLGregorianCalendar validateInitialTerminationTime(String value) throws UnacceptableInitialTerminationTimeFault {
+ XMLGregorianCalendar tt = parseTerminationTime(value);
+ if (tt == null) {
+ UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+ throw new UnacceptableInitialTerminationTimeFault(
+ "Unable to parse initial termination time: '" + value + "'",
+ fault);
+ }
+ XMLGregorianCalendar ct = getCurrentTime();
+ int c = tt.compare(ct);
+ if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+ UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+ fault.setMinimumTime(ct);
+ throw new UnacceptableInitialTerminationTimeFault(
+ "Invalid initial termination time",
+ fault);
+ }
+ return tt;
+ }
+
+ protected XMLGregorianCalendar validateTerminationTime(String value) throws UnacceptableTerminationTimeFault {
+ XMLGregorianCalendar tt = parseTerminationTime(value);
+ if (tt == null) {
+ UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+ throw new UnacceptableTerminationTimeFault(
+ "Unable to parse termination time: '" + value + "'",
+ fault);
+ }
+ XMLGregorianCalendar ct = getCurrentTime();
+ int c = tt.compare(ct);
+ if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+ UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+ fault.setMinimumTime(ct);
+ throw new UnacceptableTerminationTimeFault(
+ "Invalid termination time",
+ fault);
+ }
+ return tt;
+ }
+
+ protected XMLGregorianCalendar parseTerminationTime(String value) {
+ try {
+ Duration d = datatypeFactory.newDuration(value);
+ XMLGregorianCalendar c = getCurrentTime();
+ c.add(d);
+ return c;
+ } catch (Exception e) { }
+ try {
+ Duration d = datatypeFactory.newDurationDayTime(value);
+ XMLGregorianCalendar c = getCurrentTime();
+ c.add(d);
+ return c;
+ } catch (Exception e) { }
+ try {
+ Duration d = datatypeFactory.newDurationYearMonth(value);
+ XMLGregorianCalendar c = getCurrentTime();
+ c.add(d);
+ return c;
+ } catch (Exception e) { }
+ try {
+ return datatypeFactory.newXMLGregorianCalendar(value);
+ } catch (Exception e) { }
+ return null;
+ }
+
+ protected XMLGregorianCalendar getCurrentTime() {
+ return datatypeFactory.newXMLGregorianCalendar(new GregorianCalendar());
+ }
+
+ public XMLGregorianCalendar getTerminationTime() {
+ return terminationTime;
+ }
+
+ public void setTerminationTime(XMLGregorianCalendar terminationTime) {
+ this.terminationTime = terminationTime;
+ }
+
+ public void create(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+ validateSubscription(subscribeRequest);
+ start();
+ }
+
+ protected abstract void start() throws SubscribeCreationFailedFault;
+
+ protected abstract void pause() throws PauseFailedFault;
+
+ protected abstract void resume() throws ResumeFailedFault;
+
+ protected abstract void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault;
+
+ protected void unsubscribe() throws UnableToDestroySubscriptionFault {
+ try {
+ unregister();
+ } catch (EndpointRegistrationException e) {
+ UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
+ throw new UnableToDestroySubscriptionFault("Error unregistering endpoint", fault, e);
+ }
+ }
+
+ protected String createAddress() {
+ return "http://servicemix.org/wsnotification/Subscription/" + getName();
+ }
+
+ protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+ // Check consumer reference
+ consumerReference = subscribeRequest.getConsumerReference();
+ // Check terminationTime
+ if (subscribeRequest.getInitialTerminationTime() != null &&
+ subscribeRequest.getInitialTerminationTime().isNil() == false &&
+ subscribeRequest.getInitialTerminationTime().getValue() != null) {
+ String strTerminationTime = subscribeRequest.getInitialTerminationTime().getValue();
+ terminationTime = validateInitialTerminationTime(strTerminationTime.trim());
+ }
+ // Check filter
+ if (subscribeRequest.getFilter() != null) {
+ for (Object f : subscribeRequest.getFilter().getAny()) {
+ JAXBElement e = null;
+ if (f instanceof JAXBElement) {
+ e = (JAXBElement) f;
+ f = e.getValue();
+ }
+ if (f instanceof TopicExpressionType) {
+ if (!e.getName().equals(QNAME_TOPIC_EXPRESSION)) {
+ InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+ throw new InvalidTopicExpressionFault("Unrecognized TopicExpression: " + e, fault);
+ }
+ topic = (TopicExpressionType) f;
+ } else if (f instanceof QueryExpressionType) {
+ if (e != null && e.getName().equals(QNAME_PRODUCER_PROPERTIES)) {
+ InvalidProducerPropertiesExpressionFaultType fault = new InvalidProducerPropertiesExpressionFaultType();
+ throw new InvalidProducerPropertiesExpressionFault("ProducerProperties are not supported", fault);
+ } else if (e != null && e.getName().equals(QNAME_MESSAGE_CONTENT)) {
+ if (contentFilter != null) {
+ InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
+ throw new InvalidMessageContentExpressionFault("Only one MessageContent filter can be specified", fault);
+ }
+ contentFilter = (QueryExpressionType) f;
+ // Defaults to XPath 1.0
+ if (contentFilter.getDialect() == null) {
+ contentFilter.setDialect(XPATH1_URI);
+ }
+ } else {
+ InvalidFilterFaultType fault = new InvalidFilterFaultType();
+ throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
+ }
+ } else {
+ InvalidFilterFaultType fault = new InvalidFilterFaultType();
+ throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
+ }
+ }
+ }
+ // Check policy
+ if (subscribeRequest.getSubscriptionPolicy() != null) {
+ for (Object p : subscribeRequest.getSubscriptionPolicy().getAny()) {
+ JAXBElement e = null;
+ if (p instanceof JAXBElement) {
+ e = (JAXBElement) p;
+ p = e.getValue();
+ }
+ if (p instanceof UseRaw) {
+ useRaw = true;
+ } else {
+ InvalidFilterFaultType fault = new InvalidFilterFaultType();
+ throw new InvalidFilterFault("Unrecognized policy: " + p, fault);
+ }
+ }
+ }
+ // Check all parameters
+ if (consumerReference == null) {
+ SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+ throw new SubscribeCreationFailedFault("Invalid ConsumerReference: null", fault);
+ }
+ // TODO check we can resolve endpoint
+ if (topic == null) {
+ InvalidFilterFaultType fault = new InvalidFilterFaultType();
+ throw new InvalidFilterFault("Must specify a topic to subscribe on", fault);
+ }
+ if (contentFilter != null && !contentFilter.getDialect().equals(XPATH1_URI)) {
+ InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
+ throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect: '" + contentFilter.getDialect() + "'", fault);
+ }
+ if (terminationTime != null) {
+ UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+ throw new UnacceptableInitialTerminationTimeFault(
+ "InitialTerminationTime is not supported",
+ fault);
+ }
+ }
+
+ public AbstractNotificationBroker getBroker() {
+ return broker;
+ }
+
+ public void setBroker(AbstractNotificationBroker broker) {
+ this.broker = broker;
+ }
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java Tue Feb 21 15:40:05 2006
@@ -1,25 +1,25 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-public interface EndpointManager {
-
- Object register(String address,
- Object service) throws EndpointRegistrationException;
-
- void unregister(Object endpoint) throws EndpointRegistrationException;
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+public interface EndpointManager {
+
+ Object register(String address,
+ Object service) throws EndpointRegistrationException;
+
+ void unregister(Object endpoint) throws EndpointRegistrationException;
+
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java
------------------------------------------------------------------------------
svn:eol-style = native